2019-10-24 09:47:15

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 00/10] pipe: Notification queue preparation [ver #2]


Here's a set of preparatory patches for building a general notification
queue on top of pipes. It makes a number of significant changes:

(1) It removes the nr_exclusive argument from __wake_up_sync_key() as this
is always 1. This prepares for step 2.

(2) Adds wake_up_interruptible_sync_poll_locked() so that poll can be
woken up from a function that's holding the poll waitqueue spinlock.

(3) Change the pipe buffer ring to be managed in terms of unbounded head
and tail indices rather than bounded index and length. This means
that reading the pipe only needs to modify one index, not two.

(4) A selection of helper functions are provided to query the state of the
pipe buffer, plus a couple to apply updates to the pipe indices.

(5) The pipe ring is allowed to have kernel-reserved slots. This allows
many notification messages to be spliced in by the kernel without
allowing userspace to pin too many pages if it writes to the same
pipe.

(6) Advance the head and tail indices inside the pipe waitqueue lock and
use step 2 to poke poll without having to take the lock twice.

(7) Rearrange pipe_write() to preallocate the buffer it is going to write
into and then drop the spinlock. This allows kernel notifications to
then be added the ring whilst it is filling the buffer it allocated.
The read side is stalled because the pipe mutex is still held.

(8) Don't wake up readers on a pipe if there was already data in it when
we added more.

(9) Don't wake up writers on a pipe if the ring wasn't full before we
removed a buffer.

The patches can be found here also:

http://git.kernel.org/cgit/linux/kernel/git/dhowells/linux-fs.git/log/?h=pipe-experimental

PATCHES BENCHMARK BEST TOTAL BYTES AVG BYTES STDDEV
======= =============== =============== =============== =============== ===============
- pipe 307457969 36348556755 302904639 10622403
- splice 287117614 26933658717 224447155 160777958
- vmsplice 435180375 51302964090 427524700 19083037

rm-nrx pipe 311091179 37093181356 309109844 7221622
rm-nrx splice 285628049 27916298942 232635824 158296431
rm-nrx vmsplice 417703153 47570362546 396419687 33960822

wakesl pipe 310698731 36772541631 306437846 8249347
wakesl splice 286193726 28600435451 238336962 141169318
wakesl vmsplice 436175803 50723895824 422699131 40724240

ht pipe 305534565 36426079543 303550662 5673885
ht splice 243632025 23319439010 194328658 150479853
ht vmsplice 432825176 49101781001 409181508 44102509

k-rsv pipe 308691523 36652267561 305435563 12972559
k-rsv splice 244793528 23625172865 196876440 125319143
k-rsv vmsplice 436119082 49460808579 412173404 55547525

r-adv-t pipe 310094218 36860182219 307168185 8081101
r-adv-t splice 285527382 27085052687 225708772 206918887
r-adv-t vmsplice 336885948 40128756927 334406307 5895935

r-cond pipe 308727804 36635828180 305298568 9976806
r-cond splice 284467568 28445793054 237048275 200284329
r-cond vmsplice 449679489 51134833848 426123615 66790875

w-preal pipe 307416578 36662086426 305517386 6216663
w-preal splice 282655051 28455249109 237127075 194154549
w-preal vmsplice 437002601 47832160621 398601338 96513019

w-redun pipe 307279630 36329750422 302747920 8913567
w-redun splice 284324488 27327152734 227726272 219735663
w-redun vmsplice 451141971 51485257719 429043814 51388217

w-ckful pipe 305055247 36374947350 303124561 5400728
w-ckful splice 281575308 26841554544 223679621 215942886
w-ckful vmsplice 436653588 47564907110 396374225 82255342

The patches column indicates the point in the patchset at which the benchmarks
were taken:

0 No patches
rm-nrx "Remove the nr_exclusive argument from __wake_up_sync_key()"
wakesl "Add wake_up_interruptible_sync_poll_locked()"
ht "pipe: Use head and tail pointers for the ring, not cursor and length"
k-rsv "pipe: Allow pipes to have kernel-reserved slots"
r-adv-t "pipe: Advance tail pointer inside of wait spinlock in pipe_read()"
r-cond "pipe: Conditionalise wakeup in pipe_read()"
w-preal "pipe: Rearrange sequence in pipe_write() to preallocate slot"
w-redun "pipe: Remove redundant wakeup from pipe_write()"
w-ckful "pipe: Check for ring full inside of the spinlock in pipe_write()"

Changes:

ver #2:

(*) Split the notification patches out into a separate branch.

(*) Removed the nr_exclusive parameter from __wake_up_sync_key().

(*) Renamed the locked wakeup function.

(*) Add helpers for empty, full, occupancy.

(*) Split the addition of ->max_usage out into its own patch.

(*) Fixed some bits pointed out by Rasmus Villemoes.

ver #1:

(*) Build on top of standard pipes instead of having a driver.

David
---
David Howells (10):
pipe: Reduce #inclusion of pipe_fs_i.h
Remove the nr_exclusive argument from __wake_up_sync_key()
Add wake_up_interruptible_sync_poll_locked()
pipe: Use head and tail pointers for the ring, not cursor and length
pipe: Allow pipes to have kernel-reserved slots
pipe: Advance tail pointer inside of wait spinlock in pipe_read()
pipe: Conditionalise wakeup in pipe_read()
pipe: Rearrange sequence in pipe_write() to preallocate slot
pipe: Remove redundant wakeup from pipe_write()
pipe: Check for ring full inside of the spinlock in pipe_write()


fs/exec.c | 1
fs/fuse/dev.c | 31 +++--
fs/ocfs2/aops.c | 1
fs/pipe.c | 225 ++++++++++++++++++++++---------------
fs/splice.c | 188 +++++++++++++++++++------------
include/linux/pipe_fs_i.h | 90 ++++++++++++++-
include/linux/uio.h | 4 -
include/linux/wait.h | 11 +-
kernel/exit.c | 2
kernel/sched/wait.c | 37 ++++--
lib/iov_iter.c | 266 +++++++++++++++++++++++++-------------------
security/smack/smack_lsm.c | 1
12 files changed, 541 insertions(+), 316 deletions(-)


2019-10-24 09:47:57

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 01/10] pipe: Reduce #inclusion of pipe_fs_i.h [ver #2]

Remove some #inclusions of linux/pipe_fs_i.h that don't seem to be
necessary any more.

Signed-off-by: David Howells <[email protected]>
---

fs/exec.c | 1 -
fs/ocfs2/aops.c | 1 -
security/smack/smack_lsm.c | 1 -
3 files changed, 3 deletions(-)

diff --git a/fs/exec.c b/fs/exec.c
index 555e93c7dec8..57bc7ef8d31b 100644
--- a/fs/exec.c
+++ b/fs/exec.c
@@ -59,7 +59,6 @@
#include <linux/kmod.h>
#include <linux/fsnotify.h>
#include <linux/fs_struct.h>
-#include <linux/pipe_fs_i.h>
#include <linux/oom.h>
#include <linux/compat.h>
#include <linux/vmalloc.h>
diff --git a/fs/ocfs2/aops.c b/fs/ocfs2/aops.c
index 8de1c9d644f6..c50ac6b7415b 100644
--- a/fs/ocfs2/aops.c
+++ b/fs/ocfs2/aops.c
@@ -11,7 +11,6 @@
#include <linux/pagemap.h>
#include <asm/byteorder.h>
#include <linux/swap.h>
-#include <linux/pipe_fs_i.h>
#include <linux/mpage.h>
#include <linux/quotaops.h>
#include <linux/blkdev.h>
diff --git a/security/smack/smack_lsm.c b/security/smack/smack_lsm.c
index abeb09c30633..ecea41ce919b 100644
--- a/security/smack/smack_lsm.c
+++ b/security/smack/smack_lsm.c
@@ -28,7 +28,6 @@
#include <linux/icmpv6.h>
#include <linux/slab.h>
#include <linux/mutex.h>
-#include <linux/pipe_fs_i.h>
#include <net/cipso_ipv4.h>
#include <net/ip.h>
#include <net/ipv6.h>

2019-10-24 09:55:04

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 07/10] pipe: Conditionalise wakeup in pipe_read() [ver #2]

Only do a wakeup in pipe_read() if we made space in a completely full
buffer. The producer shouldn't be waiting on pipe->wait otherwise.

Signed-off-by: David Howells <[email protected]>
---

fs/pipe.c | 15 ++++++---------
1 file changed, 6 insertions(+), 9 deletions(-)

diff --git a/fs/pipe.c b/fs/pipe.c
index 1274305772fb..e3a8f10750c9 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -327,11 +327,13 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
spin_lock_irq(&pipe->wait.lock);
tail++;
pipe_commit_read(pipe, tail);
- do_wakeup = 0;
- wake_up_interruptible_sync_poll_locked(
- &pipe->wait, EPOLLOUT | EPOLLWRNORM);
+ do_wakeup = 1;
+ if (head - (tail - 1) == pipe->max_usage)
+ wake_up_interruptible_sync_poll_locked(
+ &pipe->wait, EPOLLOUT | EPOLLWRNORM);
spin_unlock_irq(&pipe->wait.lock);
- kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
+ if (head - (tail - 1) == pipe->max_usage)
+ kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
}
total_len -= chars;
if (!total_len)
@@ -360,11 +362,6 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
ret = -ERESTARTSYS;
break;
}
- if (do_wakeup) {
- wake_up_interruptible_sync_poll(&pipe->wait, EPOLLOUT | EPOLLWRNORM);
- kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
- do_wakeup = 0;
- }
pipe_wait(pipe);
}
__pipe_unlock(pipe);

2019-10-24 09:57:20

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 10/10] pipe: Check for ring full inside of the spinlock in pipe_write() [ver #2]

Make pipe_write() check to see if the ring has become full between it
taking the pipe mutex, checking the ring status and then taking the
spinlock.

This can happen if a notification is written into the pipe as that happens
without the pipe mutex.

Signed-off-by: David Howells <[email protected]>
---

fs/pipe.c | 5 +++++
1 file changed, 5 insertions(+)

diff --git a/fs/pipe.c b/fs/pipe.c
index 3df93990dd9d..6a982a88f658 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -462,6 +462,11 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
spin_lock_irq(&pipe->wait.lock);

head = pipe->head;
+ if (pipe_full(head, pipe->tail, max_usage)) {
+ spin_unlock_irq(&pipe->wait.lock);
+ continue;
+ }
+
pipe_commit_write(pipe, head + 1);

/* Always wake up, even if the copy fails. Otherwise

2019-10-24 14:14:46

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

Convert pipes to use head and tail pointers for the buffer ring rather than
pointer and length as the latter requires two atomic ops to update (or a
combined op) whereas the former only requires one.

(1) The head pointer is the point at which production occurs and points to
the slot in which the next buffer will be placed. This is equivalent
to pipe->curbuf + pipe->nrbufs.

The head pointer belongs to the write-side.

(2) The tail pointer is the point at which consumption occurs. It points
to the next slot to be consumed. This is equivalent to pipe->curbuf.

The tail pointer belongs to the read-side.

(3) head and tail are allowed to run to UINT_MAX and wrap naturally. They
are only masked off when the array is being accessed, e.g.:

pipe->bufs[head & mask]

This means that it is not necessary to have a dead slot in the ring as
head == tail isn't ambiguous.

(4) The ring is empty if "head == tail".

A helper, pipe_empty(), is provided for this.

(5) The occupancy of the ring is "head - tail".

A helper, pipe_occupancy(), is provided for this.

(6) The number of free slots in the ring is "pipe->ring_size - occupancy".

A helper, pipe_space_for_user() is provided to indicate how many slots
userspace may use.

(7) The ring is full if "head - tail >= pipe->ring_size".

A helper, pipe_full(), is provided for this.

Signed-off-by: David Howells <[email protected]>
---

fs/fuse/dev.c | 31 +++--
fs/pipe.c | 169 ++++++++++++++++-------------
fs/splice.c | 188 ++++++++++++++++++++------------
include/linux/pipe_fs_i.h | 86 ++++++++++++++-
include/linux/uio.h | 4 -
lib/iov_iter.c | 266 +++++++++++++++++++++++++--------------------
6 files changed, 464 insertions(+), 280 deletions(-)

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index dadd617d826c..1e4bc27573cc 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -703,7 +703,7 @@ static int fuse_copy_fill(struct fuse_copy_state *cs)
cs->pipebufs++;
cs->nr_segs--;
} else {
- if (cs->nr_segs == cs->pipe->buffers)
+ if (cs->nr_segs >= cs->pipe->ring_size)
return -EIO;

page = alloc_page(GFP_HIGHUSER);
@@ -879,7 +879,7 @@ static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page,
struct pipe_buffer *buf;
int err;

- if (cs->nr_segs == cs->pipe->buffers)
+ if (cs->nr_segs >= cs->pipe->ring_size)
return -EIO;

err = unlock_request(cs->req);
@@ -1341,7 +1341,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
if (!fud)
return -EPERM;

- bufs = kvmalloc_array(pipe->buffers, sizeof(struct pipe_buffer),
+ bufs = kvmalloc_array(pipe->ring_size, sizeof(struct pipe_buffer),
GFP_KERNEL);
if (!bufs)
return -ENOMEM;
@@ -1353,7 +1353,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
if (ret < 0)
goto out;

- if (pipe->nrbufs + cs.nr_segs > pipe->buffers) {
+ if (pipe_occupancy(pipe->head, pipe->tail) + cs.nr_segs > pipe->ring_size) {
ret = -EIO;
goto out;
}
@@ -1935,6 +1935,7 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
struct file *out, loff_t *ppos,
size_t len, unsigned int flags)
{
+ unsigned int head, tail, mask, count;
unsigned nbuf;
unsigned idx;
struct pipe_buffer *bufs;
@@ -1949,8 +1950,12 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,

pipe_lock(pipe);

- bufs = kvmalloc_array(pipe->nrbufs, sizeof(struct pipe_buffer),
- GFP_KERNEL);
+ head = pipe->head;
+ tail = pipe->tail;
+ mask = pipe->ring_size - 1;
+ count = head - tail;
+
+ bufs = kvmalloc_array(count, sizeof(struct pipe_buffer), GFP_KERNEL);
if (!bufs) {
pipe_unlock(pipe);
return -ENOMEM;
@@ -1958,8 +1963,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,

nbuf = 0;
rem = 0;
- for (idx = 0; idx < pipe->nrbufs && rem < len; idx++)
- rem += pipe->bufs[(pipe->curbuf + idx) & (pipe->buffers - 1)].len;
+ for (idx = tail; idx < head && rem < len; idx++)
+ rem += pipe->bufs[idx & mask].len;

ret = -EINVAL;
if (rem < len)
@@ -1970,16 +1975,16 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
struct pipe_buffer *ibuf;
struct pipe_buffer *obuf;

- BUG_ON(nbuf >= pipe->buffers);
- BUG_ON(!pipe->nrbufs);
- ibuf = &pipe->bufs[pipe->curbuf];
+ BUG_ON(nbuf >= pipe->ring_size);
+ BUG_ON(tail == head);
+ ibuf = &pipe->bufs[tail & mask];
obuf = &bufs[nbuf];

if (rem >= ibuf->len) {
*obuf = *ibuf;
ibuf->ops = NULL;
- pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
- pipe->nrbufs--;
+ tail++;
+ pipe_commit_read(pipe, tail);
} else {
if (!pipe_buf_get(pipe, ibuf))
goto out_free;
diff --git a/fs/pipe.c b/fs/pipe.c
index 8a2ab2f974bd..8a0806fe12d3 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -43,10 +43,11 @@ unsigned long pipe_user_pages_hard;
unsigned long pipe_user_pages_soft = PIPE_DEF_BUFFERS * INR_OPEN_CUR;

/*
- * We use a start+len construction, which provides full use of the
- * allocated memory.
- * -- Florian Coosmann (FGC)
- *
+ * We use head and tail indices that aren't masked off, except at the point of
+ * dereference, but rather they're allowed to wrap naturally. This means there
+ * isn't a dead spot in the buffer, provided the ring size < INT_MAX.
+ * -- David Howells 2019-09-23.
+ *
* Reads with count = 0 should always return 0.
* -- Julian Bradfield 1999-06-07.
*
@@ -285,10 +286,12 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
ret = 0;
__pipe_lock(pipe);
for (;;) {
- int bufs = pipe->nrbufs;
- if (bufs) {
- int curbuf = pipe->curbuf;
- struct pipe_buffer *buf = pipe->bufs + curbuf;
+ unsigned int head = pipe->head;
+ unsigned int tail = pipe->tail;
+ unsigned int mask = pipe->ring_size - 1;
+
+ if (!pipe_empty(head, tail)) {
+ struct pipe_buffer *buf = &pipe->bufs[tail & mask];
size_t chars = buf->len;
size_t written;
int error;
@@ -321,17 +324,17 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)

if (!buf->len) {
pipe_buf_release(pipe, buf);
- curbuf = (curbuf + 1) & (pipe->buffers - 1);
- pipe->curbuf = curbuf;
- pipe->nrbufs = --bufs;
+ tail++;
+ pipe_commit_read(pipe, tail);
do_wakeup = 1;
}
total_len -= chars;
if (!total_len)
break; /* common path: read succeeded */
+ if (!pipe_empty(head, tail)) /* More to do? */
+ continue;
}
- if (bufs) /* More to do? */
- continue;
+
if (!pipe->writers)
break;
if (!pipe->waiting_writers) {
@@ -380,6 +383,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
{
struct file *filp = iocb->ki_filp;
struct pipe_inode_info *pipe = filp->private_data;
+ unsigned int head, tail, max_usage, mask;
ssize_t ret = 0;
int do_wakeup = 0;
size_t total_len = iov_iter_count(from);
@@ -397,12 +401,15 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
goto out;
}

+ tail = pipe->tail;
+ head = pipe->head;
+ max_usage = pipe->ring_size;
+ mask = pipe->ring_size - 1;
+
/* We try to merge small writes */
chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */
- if (pipe->nrbufs && chars != 0) {
- int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) &
- (pipe->buffers - 1);
- struct pipe_buffer *buf = pipe->bufs + lastbuf;
+ if (!pipe_empty(head, tail) && chars != 0) {
+ struct pipe_buffer *buf = &pipe->bufs[(head - 1) & mask];
int offset = buf->offset + buf->len;

if (pipe_buf_can_merge(buf) && offset + chars <= PAGE_SIZE) {
@@ -423,18 +430,16 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
}

for (;;) {
- int bufs;
-
if (!pipe->readers) {
send_sig(SIGPIPE, current, 0);
if (!ret)
ret = -EPIPE;
break;
}
- bufs = pipe->nrbufs;
- if (bufs < pipe->buffers) {
- int newbuf = (pipe->curbuf + bufs) & (pipe->buffers-1);
- struct pipe_buffer *buf = pipe->bufs + newbuf;
+
+ tail = pipe->tail;
+ if (!pipe_full(head, tail, max_usage)) {
+ struct pipe_buffer *buf = &pipe->bufs[head & mask];
struct page *page = pipe->tmp_page;
int copied;

@@ -470,14 +475,19 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
buf->ops = &packet_pipe_buf_ops;
buf->flags = PIPE_BUF_FLAG_PACKET;
}
- pipe->nrbufs = ++bufs;
+
+ head++;
+ pipe_commit_write(pipe, head);
pipe->tmp_page = NULL;

if (!iov_iter_count(from))
break;
}
- if (bufs < pipe->buffers)
+
+ if (!pipe_full(head, tail, max_usage))
continue;
+
+ /* Wait for buffer space to become available. */
if (filp->f_flags & O_NONBLOCK) {
if (!ret)
ret = -EAGAIN;
@@ -515,17 +525,19 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
static long pipe_ioctl(struct file *filp, unsigned int cmd, unsigned long arg)
{
struct pipe_inode_info *pipe = filp->private_data;
- int count, buf, nrbufs;
+ int count, head, tail, mask;

switch (cmd) {
case FIONREAD:
__pipe_lock(pipe);
count = 0;
- buf = pipe->curbuf;
- nrbufs = pipe->nrbufs;
- while (--nrbufs >= 0) {
- count += pipe->bufs[buf].len;
- buf = (buf+1) & (pipe->buffers - 1);
+ head = pipe->head;
+ tail = pipe->tail;
+ mask = pipe->ring_size - 1;
+
+ while (tail != head) {
+ count += pipe->bufs[tail & mask].len;
+ tail++;
}
__pipe_unlock(pipe);

@@ -541,21 +553,25 @@ pipe_poll(struct file *filp, poll_table *wait)
{
__poll_t mask;
struct pipe_inode_info *pipe = filp->private_data;
- int nrbufs;
+ unsigned int head = READ_ONCE(pipe->head);
+ unsigned int tail = READ_ONCE(pipe->tail);

poll_wait(filp, &pipe->wait, wait);

+ BUG_ON(pipe_occupancy(head, tail) > pipe->ring_size);
+
/* Reading only -- no need for acquiring the semaphore. */
- nrbufs = pipe->nrbufs;
mask = 0;
if (filp->f_mode & FMODE_READ) {
- mask = (nrbufs > 0) ? EPOLLIN | EPOLLRDNORM : 0;
+ if (!pipe_empty(head, tail))
+ mask |= EPOLLIN | EPOLLRDNORM;
if (!pipe->writers && filp->f_version != pipe->w_counter)
mask |= EPOLLHUP;
}

if (filp->f_mode & FMODE_WRITE) {
- mask |= (nrbufs < pipe->buffers) ? EPOLLOUT | EPOLLWRNORM : 0;
+ if (!pipe_full(head, tail, pipe->ring_size))
+ mask |= EPOLLOUT | EPOLLWRNORM;
/*
* Most Unices do not set EPOLLERR for FIFOs but on Linux they
* behave exactly like pipes for poll().
@@ -679,7 +695,7 @@ struct pipe_inode_info *alloc_pipe_info(void)
if (pipe->bufs) {
init_waitqueue_head(&pipe->wait);
pipe->r_counter = pipe->w_counter = 1;
- pipe->buffers = pipe_bufs;
+ pipe->ring_size = pipe_bufs;
pipe->user = user;
mutex_init(&pipe->mutex);
return pipe;
@@ -697,9 +713,9 @@ void free_pipe_info(struct pipe_inode_info *pipe)
{
int i;

- (void) account_pipe_buffers(pipe->user, pipe->buffers, 0);
+ (void) account_pipe_buffers(pipe->user, pipe->ring_size, 0);
free_uid(pipe->user);
- for (i = 0; i < pipe->buffers; i++) {
+ for (i = 0; i < pipe->ring_size; i++) {
struct pipe_buffer *buf = pipe->bufs + i;
if (buf->ops)
pipe_buf_release(pipe, buf);
@@ -880,7 +896,7 @@ SYSCALL_DEFINE1(pipe, int __user *, fildes)

static int wait_for_partner(struct pipe_inode_info *pipe, unsigned int *cnt)
{
- int cur = *cnt;
+ int cur = *cnt;

while (cur == *cnt) {
pipe_wait(pipe);
@@ -955,7 +971,7 @@ static int fifo_open(struct inode *inode, struct file *filp)
}
}
break;
-
+
case FMODE_WRITE:
/*
* O_WRONLY
@@ -975,7 +991,7 @@ static int fifo_open(struct inode *inode, struct file *filp)
goto err_wr;
}
break;
-
+
case FMODE_READ | FMODE_WRITE:
/*
* O_RDWR
@@ -1054,14 +1070,14 @@ unsigned int round_pipe_size(unsigned long size)
static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)
{
struct pipe_buffer *bufs;
- unsigned int size, nr_pages;
+ unsigned int size, nr_slots, head, tail, mask, n;
unsigned long user_bufs;
long ret = 0;

size = round_pipe_size(arg);
- nr_pages = size >> PAGE_SHIFT;
+ nr_slots = size >> PAGE_SHIFT;

- if (!nr_pages)
+ if (!nr_slots)
return -EINVAL;

/*
@@ -1071,13 +1087,13 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)
* Decreasing the pipe capacity is always permitted, even
* if the user is currently over a limit.
*/
- if (nr_pages > pipe->buffers &&
+ if (nr_slots > pipe->ring_size &&
size > pipe_max_size && !capable(CAP_SYS_RESOURCE))
return -EPERM;

- user_bufs = account_pipe_buffers(pipe->user, pipe->buffers, nr_pages);
+ user_bufs = account_pipe_buffers(pipe->user, pipe->ring_size, nr_slots);

- if (nr_pages > pipe->buffers &&
+ if (nr_slots > pipe->ring_size &&
(too_many_pipe_buffers_hard(user_bufs) ||
too_many_pipe_buffers_soft(user_bufs)) &&
is_unprivileged_user()) {
@@ -1086,17 +1102,21 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)
}

/*
- * We can shrink the pipe, if arg >= pipe->nrbufs. Since we don't
- * expect a lot of shrink+grow operations, just free and allocate
- * again like we would do for growing. If the pipe currently
+ * We can shrink the pipe, if arg is greater than the ring occupancy.
+ * Since we don't expect a lot of shrink+grow operations, just free and
+ * allocate again like we would do for growing. If the pipe currently
* contains more buffers than arg, then return busy.
*/
- if (nr_pages < pipe->nrbufs) {
+ mask = pipe->ring_size - 1;
+ head = pipe->head;
+ tail = pipe->tail;
+ n = pipe_occupancy(pipe->head, pipe->tail);
+ if (nr_slots < n) {
ret = -EBUSY;
goto out_revert_acct;
}

- bufs = kcalloc(nr_pages, sizeof(*bufs),
+ bufs = kcalloc(nr_slots, sizeof(*bufs),
GFP_KERNEL_ACCOUNT | __GFP_NOWARN);
if (unlikely(!bufs)) {
ret = -ENOMEM;
@@ -1105,33 +1125,36 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)

/*
* The pipe array wraps around, so just start the new one at zero
- * and adjust the indexes.
+ * and adjust the indices.
*/
- if (pipe->nrbufs) {
- unsigned int tail;
- unsigned int head;
-
- tail = pipe->curbuf + pipe->nrbufs;
- if (tail < pipe->buffers)
- tail = 0;
- else
- tail &= (pipe->buffers - 1);
-
- head = pipe->nrbufs - tail;
- if (head)
- memcpy(bufs, pipe->bufs + pipe->curbuf, head * sizeof(struct pipe_buffer));
- if (tail)
- memcpy(bufs + head, pipe->bufs, tail * sizeof(struct pipe_buffer));
+ if (n > 0) {
+ unsigned int h = head & mask;
+ unsigned int t = tail & mask;
+ if (h > t) {
+ memcpy(bufs, pipe->bufs + t,
+ n * sizeof(struct pipe_buffer));
+ } else {
+ unsigned int tsize = pipe->ring_size - t;
+ if (h > 0)
+ memcpy(bufs + tsize, pipe->bufs,
+ h * sizeof(struct pipe_buffer));
+ memcpy(bufs, pipe->bufs + t,
+ tsize * sizeof(struct pipe_buffer));
+ }
}

- pipe->curbuf = 0;
+ head = n;
+ tail = 0;
+
kfree(pipe->bufs);
pipe->bufs = bufs;
- pipe->buffers = nr_pages;
- return nr_pages * PAGE_SIZE;
+ pipe->ring_size = nr_slots;
+ pipe->tail = tail;
+ pipe->head = head;
+ return pipe->ring_size * PAGE_SIZE;

out_revert_acct:
- (void) account_pipe_buffers(pipe->user, nr_pages, pipe->buffers);
+ (void) account_pipe_buffers(pipe->user, nr_slots, pipe->ring_size);
return ret;
}

@@ -1161,7 +1184,7 @@ long pipe_fcntl(struct file *file, unsigned int cmd, unsigned long arg)
ret = pipe_set_size(pipe, arg);
break;
case F_GETPIPE_SZ:
- ret = pipe->buffers * PAGE_SIZE;
+ ret = pipe->ring_size * PAGE_SIZE;
break;
default:
ret = -EINVAL;
diff --git a/fs/splice.c b/fs/splice.c
index 98412721f056..bbc025236ff9 100644
--- a/fs/splice.c
+++ b/fs/splice.c
@@ -185,6 +185,9 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
struct splice_pipe_desc *spd)
{
unsigned int spd_pages = spd->nr_pages;
+ unsigned int tail = pipe->tail;
+ unsigned int head = pipe->head;
+ unsigned int mask = pipe->ring_size - 1;
int ret = 0, page_nr = 0;

if (!spd_pages)
@@ -196,9 +199,8 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
goto out;
}

- while (pipe->nrbufs < pipe->buffers) {
- int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
- struct pipe_buffer *buf = pipe->bufs + newbuf;
+ while (!pipe_full(head, tail, pipe->ring_size)) {
+ struct pipe_buffer *buf = &pipe->bufs[head & mask];

buf->page = spd->pages[page_nr];
buf->offset = spd->partial[page_nr].offset;
@@ -207,7 +209,8 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
buf->ops = spd->ops;
buf->flags = 0;

- pipe->nrbufs++;
+ head++;
+ pipe_commit_write(pipe, head);
page_nr++;
ret += buf->len;

@@ -228,17 +231,19 @@ EXPORT_SYMBOL_GPL(splice_to_pipe);

ssize_t add_to_pipe(struct pipe_inode_info *pipe, struct pipe_buffer *buf)
{
+ unsigned int head = pipe->head;
+ unsigned int tail = pipe->tail;
+ unsigned int mask = pipe->ring_size - 1;
int ret;

if (unlikely(!pipe->readers)) {
send_sig(SIGPIPE, current, 0);
ret = -EPIPE;
- } else if (pipe->nrbufs == pipe->buffers) {
+ } else if (pipe_full(head, tail, pipe->ring_size)) {
ret = -EAGAIN;
} else {
- int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
- pipe->bufs[newbuf] = *buf;
- pipe->nrbufs++;
+ pipe->bufs[head & mask] = *buf;
+ pipe_commit_write(pipe, head + 1);
return buf->len;
}
pipe_buf_release(pipe, buf);
@@ -252,14 +257,14 @@ EXPORT_SYMBOL(add_to_pipe);
*/
int splice_grow_spd(const struct pipe_inode_info *pipe, struct splice_pipe_desc *spd)
{
- unsigned int buffers = READ_ONCE(pipe->buffers);
+ unsigned int max_usage = READ_ONCE(pipe->ring_size);

- spd->nr_pages_max = buffers;
- if (buffers <= PIPE_DEF_BUFFERS)
+ spd->nr_pages_max = max_usage;
+ if (max_usage <= PIPE_DEF_BUFFERS)
return 0;

- spd->pages = kmalloc_array(buffers, sizeof(struct page *), GFP_KERNEL);
- spd->partial = kmalloc_array(buffers, sizeof(struct partial_page),
+ spd->pages = kmalloc_array(max_usage, sizeof(struct page *), GFP_KERNEL);
+ spd->partial = kmalloc_array(max_usage, sizeof(struct partial_page),
GFP_KERNEL);

if (spd->pages && spd->partial)
@@ -298,10 +303,11 @@ ssize_t generic_file_splice_read(struct file *in, loff_t *ppos,
{
struct iov_iter to;
struct kiocb kiocb;
- int idx, ret;
+ unsigned int i_head;
+ int ret;

iov_iter_pipe(&to, READ, pipe, len);
- idx = to.idx;
+ i_head = to.head;
init_sync_kiocb(&kiocb, in);
kiocb.ki_pos = *ppos;
ret = call_read_iter(in, &kiocb, &to);
@@ -309,7 +315,7 @@ ssize_t generic_file_splice_read(struct file *in, loff_t *ppos,
*ppos = kiocb.ki_pos;
file_accessed(in);
} else if (ret < 0) {
- to.idx = idx;
+ to.head = i_head;
to.iov_offset = 0;
iov_iter_advance(&to, 0); /* to free what was emitted */
/*
@@ -370,11 +376,12 @@ static ssize_t default_file_splice_read(struct file *in, loff_t *ppos,
struct iov_iter to;
struct page **pages;
unsigned int nr_pages;
+ unsigned int mask;
size_t offset, base, copied = 0;
ssize_t res;
int i;

- if (pipe->nrbufs == pipe->buffers)
+ if (pipe_full(pipe->head, pipe->tail, pipe->ring_size))
return -EAGAIN;

/*
@@ -400,8 +407,9 @@ static ssize_t default_file_splice_read(struct file *in, loff_t *ppos,
}
}

- pipe->bufs[to.idx].offset = offset;
- pipe->bufs[to.idx].len -= offset;
+ mask = pipe->ring_size - 1;
+ pipe->bufs[to.head & mask].offset = offset;
+ pipe->bufs[to.head & mask].len -= offset;

for (i = 0; i < nr_pages; i++) {
size_t this_len = min_t(size_t, len, PAGE_SIZE - offset);
@@ -443,7 +451,8 @@ static int pipe_to_sendpage(struct pipe_inode_info *pipe,

more = (sd->flags & SPLICE_F_MORE) ? MSG_MORE : 0;

- if (sd->len < sd->total_len && pipe->nrbufs > 1)
+ if (sd->len < sd->total_len &&
+ pipe_occupancy(pipe->head, pipe->tail) > 1)
more |= MSG_SENDPAGE_NOTLAST;

return file->f_op->sendpage(file, buf->page, buf->offset,
@@ -481,10 +490,13 @@ static void wakeup_pipe_writers(struct pipe_inode_info *pipe)
static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_desc *sd,
splice_actor *actor)
{
+ unsigned int head = pipe->head;
+ unsigned int tail = pipe->tail;
+ unsigned int mask = pipe->ring_size - 1;
int ret;

- while (pipe->nrbufs) {
- struct pipe_buffer *buf = pipe->bufs + pipe->curbuf;
+ while (!pipe_empty(tail, head)) {
+ struct pipe_buffer *buf = &pipe->bufs[tail & mask];

sd->len = buf->len;
if (sd->len > sd->total_len)
@@ -511,8 +523,8 @@ static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des

if (!buf->len) {
pipe_buf_release(pipe, buf);
- pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
- pipe->nrbufs--;
+ tail++;
+ pipe_commit_read(pipe, tail);
if (pipe->files)
sd->need_wakeup = true;
}
@@ -543,7 +555,7 @@ static int splice_from_pipe_next(struct pipe_inode_info *pipe, struct splice_des
if (signal_pending(current))
return -ERESTARTSYS;

- while (!pipe->nrbufs) {
+ while (pipe_empty(pipe->head, pipe->tail)) {
if (!pipe->writers)
return 0;

@@ -686,7 +698,7 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
.pos = *ppos,
.u.file = out,
};
- int nbufs = pipe->buffers;
+ int nbufs = pipe->ring_size;
struct bio_vec *array = kcalloc(nbufs, sizeof(struct bio_vec),
GFP_KERNEL);
ssize_t ret;
@@ -699,16 +711,19 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
splice_from_pipe_begin(&sd);
while (sd.total_len) {
struct iov_iter from;
+ unsigned int head = pipe->head;
+ unsigned int tail = pipe->tail;
+ unsigned int mask = pipe->ring_size - 1;
size_t left;
- int n, idx;
+ int n;

ret = splice_from_pipe_next(pipe, &sd);
if (ret <= 0)
break;

- if (unlikely(nbufs < pipe->buffers)) {
+ if (unlikely(nbufs < pipe->ring_size)) {
kfree(array);
- nbufs = pipe->buffers;
+ nbufs = pipe->ring_size;
array = kcalloc(nbufs, sizeof(struct bio_vec),
GFP_KERNEL);
if (!array) {
@@ -719,16 +734,13 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,

/* build the vector */
left = sd.total_len;
- for (n = 0, idx = pipe->curbuf; left && n < pipe->nrbufs; n++, idx++) {
- struct pipe_buffer *buf = pipe->bufs + idx;
+ for (n = 0; !pipe_empty(head, tail) && left && n < nbufs; tail++, n++) {
+ struct pipe_buffer *buf = &pipe->bufs[tail & mask];
size_t this_len = buf->len;

if (this_len > left)
this_len = left;

- if (idx == pipe->buffers - 1)
- idx = -1;
-
ret = pipe_buf_confirm(pipe, buf);
if (unlikely(ret)) {
if (ret == -ENODATA)
@@ -752,14 +764,15 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
*ppos = sd.pos;

/* dismiss the fully eaten buffers, adjust the partial one */
+ tail = pipe->tail;
while (ret) {
- struct pipe_buffer *buf = pipe->bufs + pipe->curbuf;
+ struct pipe_buffer *buf = &pipe->bufs[tail & mask];
if (ret >= buf->len) {
ret -= buf->len;
buf->len = 0;
pipe_buf_release(pipe, buf);
- pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
- pipe->nrbufs--;
+ tail++;
+ pipe_commit_read(pipe, tail);
if (pipe->files)
sd.need_wakeup = true;
} else {
@@ -942,15 +955,17 @@ ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
sd->flags &= ~SPLICE_F_NONBLOCK;
more = sd->flags & SPLICE_F_MORE;

- WARN_ON_ONCE(pipe->nrbufs != 0);
+ WARN_ON_ONCE(!pipe_empty(pipe->head, pipe->tail));

while (len) {
+ unsigned int p_space;
size_t read_len;
loff_t pos = sd->pos, prev_pos = pos;

/* Don't try to read more the pipe has space for. */
- read_len = min_t(size_t, len,
- (pipe->buffers - pipe->nrbufs) << PAGE_SHIFT);
+ p_space = pipe->ring_size -
+ pipe_occupancy(pipe->head, pipe->tail);
+ read_len = min_t(size_t, len, p_space << PAGE_SHIFT);
ret = do_splice_to(in, &pos, pipe, read_len, flags);
if (unlikely(ret <= 0))
goto out_release;
@@ -989,7 +1004,7 @@ ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
}

done:
- pipe->nrbufs = pipe->curbuf = 0;
+ pipe->tail = pipe->head = 0;
file_accessed(in);
return bytes;

@@ -998,8 +1013,8 @@ ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
* If we did an incomplete transfer we must release
* the pipe buffers in question:
*/
- for (i = 0; i < pipe->buffers; i++) {
- struct pipe_buffer *buf = pipe->bufs + i;
+ for (i = 0; i < pipe->ring_size; i++) {
+ struct pipe_buffer *buf = &pipe->bufs[i];

if (buf->ops)
pipe_buf_release(pipe, buf);
@@ -1075,7 +1090,7 @@ static int wait_for_space(struct pipe_inode_info *pipe, unsigned flags)
send_sig(SIGPIPE, current, 0);
return -EPIPE;
}
- if (pipe->nrbufs != pipe->buffers)
+ if (!pipe_full(pipe->head, pipe->tail, pipe->ring_size))
return 0;
if (flags & SPLICE_F_NONBLOCK)
return -EAGAIN;
@@ -1442,16 +1457,16 @@ static int ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
int ret;

/*
- * Check ->nrbufs without the inode lock first. This function
+ * Check the pipe occupancy without the inode lock first. This function
* is speculative anyways, so missing one is ok.
*/
- if (pipe->nrbufs)
+ if (!pipe_empty(pipe->head, pipe->tail))
return 0;

ret = 0;
pipe_lock(pipe);

- while (!pipe->nrbufs) {
+ while (pipe_empty(pipe->head, pipe->tail)) {
if (signal_pending(current)) {
ret = -ERESTARTSYS;
break;
@@ -1483,13 +1498,13 @@ static int opipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
* Check ->nrbufs without the inode lock first. This function
* is speculative anyways, so missing one is ok.
*/
- if (pipe->nrbufs < pipe->buffers)
+ if (pipe_full(pipe->head, pipe->tail, pipe->ring_size))
return 0;

ret = 0;
pipe_lock(pipe);

- while (pipe->nrbufs >= pipe->buffers) {
+ while (pipe_full(pipe->head, pipe->tail, pipe->ring_size)) {
if (!pipe->readers) {
send_sig(SIGPIPE, current, 0);
ret = -EPIPE;
@@ -1520,7 +1535,10 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
size_t len, unsigned int flags)
{
struct pipe_buffer *ibuf, *obuf;
- int ret = 0, nbuf;
+ unsigned int i_head, o_head;
+ unsigned int i_tail, o_tail;
+ unsigned int i_mask, o_mask;
+ int ret = 0;
bool input_wakeup = false;


@@ -1540,7 +1558,14 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
*/
pipe_double_lock(ipipe, opipe);

+ i_tail = ipipe->tail;
+ i_mask = ipipe->ring_size - 1;
+ o_head = opipe->head;
+ o_mask = opipe->ring_size - 1;
+
do {
+ size_t o_len;
+
if (!opipe->readers) {
send_sig(SIGPIPE, current, 0);
if (!ret)
@@ -1548,14 +1573,18 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
break;
}

- if (!ipipe->nrbufs && !ipipe->writers)
+ i_head = ipipe->head;
+ o_tail = opipe->tail;
+
+ if (pipe_empty(i_head, i_tail) && !ipipe->writers)
break;

/*
* Cannot make any progress, because either the input
* pipe is empty or the output pipe is full.
*/
- if (!ipipe->nrbufs || opipe->nrbufs >= opipe->buffers) {
+ if (pipe_empty(i_head, i_tail) ||
+ pipe_full(o_head, o_tail, opipe->ring_size)) {
/* Already processed some buffers, break */
if (ret)
break;
@@ -1575,9 +1604,8 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
goto retry;
}

- ibuf = ipipe->bufs + ipipe->curbuf;
- nbuf = (opipe->curbuf + opipe->nrbufs) & (opipe->buffers - 1);
- obuf = opipe->bufs + nbuf;
+ ibuf = &ipipe->bufs[i_tail & i_mask];
+ obuf = &opipe->bufs[o_head & o_mask];

if (len >= ibuf->len) {
/*
@@ -1585,10 +1613,12 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
*/
*obuf = *ibuf;
ibuf->ops = NULL;
- opipe->nrbufs++;
- ipipe->curbuf = (ipipe->curbuf + 1) & (ipipe->buffers - 1);
- ipipe->nrbufs--;
+ i_tail++;
+ pipe_commit_read(ipipe, i_tail);
input_wakeup = true;
+ o_len = obuf->len;
+ o_head++;
+ pipe_commit_write(opipe, o_head);
} else {
/*
* Get a reference to this pipe buffer,
@@ -1610,12 +1640,14 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
pipe_buf_mark_unmergeable(obuf);

obuf->len = len;
- opipe->nrbufs++;
- ibuf->offset += obuf->len;
- ibuf->len -= obuf->len;
+ ibuf->offset += len;
+ ibuf->len -= len;
+ o_len = len;
+ o_head++;
+ pipe_commit_write(opipe, o_head);
}
- ret += obuf->len;
- len -= obuf->len;
+ ret += o_len;
+ len -= o_len;
} while (len);

pipe_unlock(ipipe);
@@ -1641,7 +1673,10 @@ static int link_pipe(struct pipe_inode_info *ipipe,
size_t len, unsigned int flags)
{
struct pipe_buffer *ibuf, *obuf;
- int ret = 0, i = 0, nbuf;
+ unsigned int i_head, o_head;
+ unsigned int i_tail, o_tail;
+ unsigned int i_mask, o_mask;
+ int ret = 0;

/*
* Potential ABBA deadlock, work around it by ordering lock
@@ -1650,6 +1685,11 @@ static int link_pipe(struct pipe_inode_info *ipipe,
*/
pipe_double_lock(ipipe, opipe);

+ i_tail = ipipe->tail;
+ i_mask = ipipe->ring_size - 1;
+ o_head = opipe->head;
+ o_mask = opipe->ring_size - 1;
+
do {
if (!opipe->readers) {
send_sig(SIGPIPE, current, 0);
@@ -1658,15 +1698,19 @@ static int link_pipe(struct pipe_inode_info *ipipe,
break;
}

+ i_head = ipipe->head;
+ o_tail = opipe->tail;
+
/*
- * If we have iterated all input buffers or ran out of
+ * If we have iterated all input buffers or run out of
* output room, break.
*/
- if (i >= ipipe->nrbufs || opipe->nrbufs >= opipe->buffers)
+ if (pipe_empty(i_head, i_tail) ||
+ pipe_full(o_head, o_tail, opipe->ring_size))
break;

- ibuf = ipipe->bufs + ((ipipe->curbuf + i) & (ipipe->buffers-1));
- nbuf = (opipe->curbuf + opipe->nrbufs) & (opipe->buffers - 1);
+ ibuf = &ipipe->bufs[i_tail & i_mask];
+ obuf = &opipe->bufs[o_head & o_mask];

/*
* Get a reference to this pipe buffer,
@@ -1678,7 +1722,6 @@ static int link_pipe(struct pipe_inode_info *ipipe,
break;
}

- obuf = opipe->bufs + nbuf;
*obuf = *ibuf;

/*
@@ -1691,11 +1734,12 @@ static int link_pipe(struct pipe_inode_info *ipipe,

if (obuf->len > len)
obuf->len = len;
-
- opipe->nrbufs++;
ret += obuf->len;
len -= obuf->len;
- i++;
+
+ o_head++;
+ pipe_commit_write(opipe, o_head);
+ i_tail++;
} while (len);

/*
diff --git a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h
index 5c626fdc10db..fad096697ff5 100644
--- a/include/linux/pipe_fs_i.h
+++ b/include/linux/pipe_fs_i.h
@@ -30,9 +30,9 @@ struct pipe_buffer {
* struct pipe_inode_info - a linux kernel pipe
* @mutex: mutex protecting the whole thing
* @wait: reader/writer wait point in case of empty/full pipe
- * @nrbufs: the number of non-empty pipe buffers in this pipe
- * @buffers: total number of buffers (should be a power of 2)
- * @curbuf: the current pipe buffer entry
+ * @head: The point of buffer production
+ * @tail: The point of buffer consumption
+ * @ring_size: total number of buffers (should be a power of 2)
* @tmp_page: cached released page
* @readers: number of current readers of this pipe
* @writers: number of current writers of this pipe
@@ -48,7 +48,9 @@ struct pipe_buffer {
struct pipe_inode_info {
struct mutex mutex;
wait_queue_head_t wait;
- unsigned int nrbufs, curbuf, buffers;
+ unsigned int head;
+ unsigned int tail;
+ unsigned int ring_size;
unsigned int readers;
unsigned int writers;
unsigned int files;
@@ -104,6 +106,82 @@ struct pipe_buf_operations {
bool (*get)(struct pipe_inode_info *, struct pipe_buffer *);
};

+/**
+ * pipe_commit_read - Set pipe buffer tail pointer in the read-side
+ * @pipe: The pipe in question
+ * @tail: The new tail pointer
+ *
+ * Update the tail pointer in the read-side code after a read has taken place.
+ */
+static inline void pipe_commit_read(struct pipe_inode_info *pipe,
+ unsigned int tail)
+{
+ pipe->tail = tail;
+}
+
+/**
+ * pipe_commit_write - Set pipe buffer head pointer in the write-side
+ * @pipe: The pipe in question
+ * @head: The new head pointer
+ *
+ * Update the head pointer in the write-side code after a write has taken place.
+ */
+static inline void pipe_commit_write(struct pipe_inode_info *pipe,
+ unsigned int head)
+{
+ pipe->head = head;
+}
+
+/**
+ * pipe_empty - Return true if the pipe is empty
+ * @head: The pipe ring head pointer
+ * @tail: The pipe ring tail pointer
+ */
+static inline bool pipe_empty(unsigned int head, unsigned int tail)
+{
+ return head == tail;
+}
+
+/**
+ * pipe_occupancy - Return number of slots used in the pipe
+ * @head: The pipe ring head pointer
+ * @tail: The pipe ring tail pointer
+ */
+static inline unsigned int pipe_occupancy(unsigned int head, unsigned int tail)
+{
+ return head - tail;
+}
+
+/**
+ * pipe_full - Return true if the pipe is full
+ * @head: The pipe ring head pointer
+ * @tail: The pipe ring tail pointer
+ * @limit: The maximum amount of slots available.
+ */
+static inline bool pipe_full(unsigned int head, unsigned int tail,
+ unsigned int limit)
+{
+ return pipe_occupancy(head, tail) >= limit;
+}
+
+/**
+ * pipe_space_for_user - Return number of slots available to userspace
+ * @head: The pipe ring head pointer
+ * @tail: The pipe ring tail pointer
+ * @pipe: The pipe info structure
+ */
+static inline unsigned int pipe_space_for_user(unsigned int head, unsigned int tail,
+ struct pipe_inode_info *pipe)
+{
+ unsigned int p_occupancy, p_space;
+
+ p_occupancy = pipe_occupancy(head, tail);
+ if (p_occupancy >= pipe->ring_size)
+ return 0;
+ p_space = pipe->ring_size - p_occupancy;
+ return p_space;
+}
+
/**
* pipe_buf_get - get a reference to a pipe_buffer
* @pipe: the pipe that the buffer belongs to
diff --git a/include/linux/uio.h b/include/linux/uio.h
index ab5f523bc0df..9576fd8158d7 100644
--- a/include/linux/uio.h
+++ b/include/linux/uio.h
@@ -45,8 +45,8 @@ struct iov_iter {
union {
unsigned long nr_segs;
struct {
- int idx;
- int start_idx;
+ unsigned int head;
+ unsigned int start_head;
};
};
};
diff --git a/lib/iov_iter.c b/lib/iov_iter.c
index 639d5e7014c1..150a40bdb21a 100644
--- a/lib/iov_iter.c
+++ b/lib/iov_iter.c
@@ -325,28 +325,33 @@ static size_t copy_page_from_iter_iovec(struct page *page, size_t offset, size_t
static bool sanity(const struct iov_iter *i)
{
struct pipe_inode_info *pipe = i->pipe;
- int idx = i->idx;
- int next = pipe->curbuf + pipe->nrbufs;
+ unsigned int p_head = pipe->head;
+ unsigned int p_tail = pipe->tail;
+ unsigned int p_mask = pipe->ring_size - 1;
+ unsigned int p_occupancy = pipe_occupancy(p_head, p_tail);
+ unsigned int i_head = i->head;
+ unsigned int idx;
+
if (i->iov_offset) {
struct pipe_buffer *p;
- if (unlikely(!pipe->nrbufs))
+ if (unlikely(p_occupancy == 0))
goto Bad; // pipe must be non-empty
- if (unlikely(idx != ((next - 1) & (pipe->buffers - 1))))
+ if (unlikely(i_head != p_head - 1))
goto Bad; // must be at the last buffer...

- p = &pipe->bufs[idx];
+ p = &pipe->bufs[i_head & p_mask];
if (unlikely(p->offset + p->len != i->iov_offset))
goto Bad; // ... at the end of segment
} else {
- if (idx != (next & (pipe->buffers - 1)))
+ if (i_head != p_head)
goto Bad; // must be right after the last buffer
}
return true;
Bad:
- printk(KERN_ERR "idx = %d, offset = %zd\n", i->idx, i->iov_offset);
- printk(KERN_ERR "curbuf = %d, nrbufs = %d, buffers = %d\n",
- pipe->curbuf, pipe->nrbufs, pipe->buffers);
- for (idx = 0; idx < pipe->buffers; idx++)
+ printk(KERN_ERR "idx = %d, offset = %zd\n", i_head, i->iov_offset);
+ printk(KERN_ERR "head = %d, tail = %d, buffers = %d\n",
+ p_head, p_tail, pipe->ring_size);
+ for (idx = 0; idx < pipe->ring_size; idx++)
printk(KERN_ERR "[%p %p %d %d]\n",
pipe->bufs[idx].ops,
pipe->bufs[idx].page,
@@ -359,18 +364,15 @@ static bool sanity(const struct iov_iter *i)
#define sanity(i) true
#endif

-static inline int next_idx(int idx, struct pipe_inode_info *pipe)
-{
- return (idx + 1) & (pipe->buffers - 1);
-}
-
static size_t copy_page_to_iter_pipe(struct page *page, size_t offset, size_t bytes,
struct iov_iter *i)
{
struct pipe_inode_info *pipe = i->pipe;
struct pipe_buffer *buf;
+ unsigned int p_tail = pipe->tail;
+ unsigned int p_mask = pipe->ring_size - 1;
+ unsigned int i_head = i->head;
size_t off;
- int idx;

if (unlikely(bytes > i->count))
bytes = i->count;
@@ -382,8 +384,7 @@ static size_t copy_page_to_iter_pipe(struct page *page, size_t offset, size_t by
return 0;

off = i->iov_offset;
- idx = i->idx;
- buf = &pipe->bufs[idx];
+ buf = &pipe->bufs[i_head & p_mask];
if (off) {
if (offset == off && buf->page == page) {
/* merge with the last one */
@@ -391,18 +392,21 @@ static size_t copy_page_to_iter_pipe(struct page *page, size_t offset, size_t by
i->iov_offset += bytes;
goto out;
}
- idx = next_idx(idx, pipe);
- buf = &pipe->bufs[idx];
+ i_head++;
+ buf = &pipe->bufs[i_head & p_mask];
}
- if (idx == pipe->curbuf && pipe->nrbufs)
+ if (pipe_full(i_head, p_tail, pipe->ring_size))
return 0;
- pipe->nrbufs++;
+
buf->ops = &page_cache_pipe_buf_ops;
- get_page(buf->page = page);
+ get_page(page);
+ buf->page = page;
buf->offset = offset;
buf->len = bytes;
+
+ pipe_commit_read(pipe, i_head);
i->iov_offset = offset + bytes;
- i->idx = idx;
+ i->head = i_head;
out:
i->count -= bytes;
return bytes;
@@ -480,24 +484,30 @@ static inline bool allocated(struct pipe_buffer *buf)
return buf->ops == &default_pipe_buf_ops;
}

-static inline void data_start(const struct iov_iter *i, int *idxp, size_t *offp)
+static inline void data_start(const struct iov_iter *i,
+ unsigned int *iter_headp, size_t *offp)
{
+ unsigned int p_mask = i->pipe->ring_size - 1;
+ unsigned int iter_head = i->head;
size_t off = i->iov_offset;
- int idx = i->idx;
- if (off && (!allocated(&i->pipe->bufs[idx]) || off == PAGE_SIZE)) {
- idx = next_idx(idx, i->pipe);
+
+ if (off && (!allocated(&i->pipe->bufs[iter_head & p_mask]) ||
+ off == PAGE_SIZE)) {
+ iter_head++;
off = 0;
}
- *idxp = idx;
+ *iter_headp = iter_head;
*offp = off;
}

static size_t push_pipe(struct iov_iter *i, size_t size,
- int *idxp, size_t *offp)
+ int *iter_headp, size_t *offp)
{
struct pipe_inode_info *pipe = i->pipe;
+ unsigned int p_tail = pipe->tail;
+ unsigned int p_mask = pipe->ring_size - 1;
+ unsigned int iter_head;
size_t off;
- int idx;
ssize_t left;

if (unlikely(size > i->count))
@@ -506,33 +516,34 @@ static size_t push_pipe(struct iov_iter *i, size_t size,
return 0;

left = size;
- data_start(i, &idx, &off);
- *idxp = idx;
+ data_start(i, &iter_head, &off);
+ *iter_headp = iter_head;
*offp = off;
if (off) {
left -= PAGE_SIZE - off;
if (left <= 0) {
- pipe->bufs[idx].len += size;
+ pipe->bufs[iter_head & p_mask].len += size;
return size;
}
- pipe->bufs[idx].len = PAGE_SIZE;
- idx = next_idx(idx, pipe);
+ pipe->bufs[iter_head & p_mask].len = PAGE_SIZE;
+ iter_head++;
}
- while (idx != pipe->curbuf || !pipe->nrbufs) {
+ while (!pipe_full(iter_head, p_tail, pipe->ring_size)) {
+ struct pipe_buffer *buf = &pipe->bufs[iter_head & p_mask];
struct page *page = alloc_page(GFP_USER);
if (!page)
break;
- pipe->nrbufs++;
- pipe->bufs[idx].ops = &default_pipe_buf_ops;
- pipe->bufs[idx].page = page;
- pipe->bufs[idx].offset = 0;
- if (left <= PAGE_SIZE) {
- pipe->bufs[idx].len = left;
+
+ buf->ops = &default_pipe_buf_ops;
+ buf->page = page;
+ buf->offset = 0;
+ buf->len = max_t(ssize_t, left, PAGE_SIZE);
+ left -= buf->len;
+ iter_head++;
+ pipe_commit_write(pipe, iter_head);
+
+ if (left == 0)
return size;
- }
- pipe->bufs[idx].len = PAGE_SIZE;
- left -= PAGE_SIZE;
- idx = next_idx(idx, pipe);
}
return size - left;
}
@@ -541,23 +552,26 @@ static size_t copy_pipe_to_iter(const void *addr, size_t bytes,
struct iov_iter *i)
{
struct pipe_inode_info *pipe = i->pipe;
+ unsigned int p_mask = pipe->ring_size - 1;
+ unsigned int i_head;
size_t n, off;
- int idx;

if (!sanity(i))
return 0;

- bytes = n = push_pipe(i, bytes, &idx, &off);
+ bytes = n = push_pipe(i, bytes, &i_head, &off);
if (unlikely(!n))
return 0;
- for ( ; n; idx = next_idx(idx, pipe), off = 0) {
+ do {
size_t chunk = min_t(size_t, n, PAGE_SIZE - off);
- memcpy_to_page(pipe->bufs[idx].page, off, addr, chunk);
- i->idx = idx;
+ memcpy_to_page(pipe->bufs[i_head & p_mask].page, off, addr, chunk);
+ i->head = i_head;
i->iov_offset = off + chunk;
n -= chunk;
addr += chunk;
- }
+ off = 0;
+ i_head++;
+ } while (n);
i->count -= bytes;
return bytes;
}
@@ -573,28 +587,31 @@ static size_t csum_and_copy_to_pipe_iter(const void *addr, size_t bytes,
__wsum *csum, struct iov_iter *i)
{
struct pipe_inode_info *pipe = i->pipe;
+ unsigned int p_mask = pipe->ring_size - 1;
+ unsigned int i_head;
size_t n, r;
size_t off = 0;
__wsum sum = *csum;
- int idx;

if (!sanity(i))
return 0;

- bytes = n = push_pipe(i, bytes, &idx, &r);
+ bytes = n = push_pipe(i, bytes, &i_head, &r);
if (unlikely(!n))
return 0;
- for ( ; n; idx = next_idx(idx, pipe), r = 0) {
+ do {
size_t chunk = min_t(size_t, n, PAGE_SIZE - r);
- char *p = kmap_atomic(pipe->bufs[idx].page);
+ char *p = kmap_atomic(pipe->bufs[i_head & p_mask].page);
sum = csum_and_memcpy(p + r, addr, chunk, sum, off);
kunmap_atomic(p);
- i->idx = idx;
+ i->head = i_head;
i->iov_offset = r + chunk;
n -= chunk;
off += chunk;
addr += chunk;
- }
+ r = 0;
+ i_head++;
+ } while (n);
i->count -= bytes;
*csum = sum;
return bytes;
@@ -645,29 +662,32 @@ static size_t copy_pipe_to_iter_mcsafe(const void *addr, size_t bytes,
struct iov_iter *i)
{
struct pipe_inode_info *pipe = i->pipe;
+ unsigned int p_mask = pipe->ring_size - 1;
+ unsigned int i_head;
size_t n, off, xfer = 0;
- int idx;

if (!sanity(i))
return 0;

- bytes = n = push_pipe(i, bytes, &idx, &off);
+ bytes = n = push_pipe(i, bytes, &i_head, &off);
if (unlikely(!n))
return 0;
- for ( ; n; idx = next_idx(idx, pipe), off = 0) {
+ do {
size_t chunk = min_t(size_t, n, PAGE_SIZE - off);
unsigned long rem;

- rem = memcpy_mcsafe_to_page(pipe->bufs[idx].page, off, addr,
- chunk);
- i->idx = idx;
+ rem = memcpy_mcsafe_to_page(pipe->bufs[i_head & p_mask].page,
+ off, addr, chunk);
+ i->head = i_head;
i->iov_offset = off + chunk - rem;
xfer += chunk - rem;
if (rem)
break;
n -= chunk;
addr += chunk;
- }
+ off = 0;
+ i_head++;
+ } while (n);
i->count -= xfer;
return xfer;
}
@@ -925,6 +945,8 @@ EXPORT_SYMBOL(copy_page_from_iter);
static size_t pipe_zero(size_t bytes, struct iov_iter *i)
{
struct pipe_inode_info *pipe = i->pipe;
+ unsigned int p_mask = pipe->ring_size - 1;
+ unsigned int i_head;
size_t n, off;
int idx;

@@ -935,13 +957,15 @@ static size_t pipe_zero(size_t bytes, struct iov_iter *i)
if (unlikely(!n))
return 0;

- for ( ; n; idx = next_idx(idx, pipe), off = 0) {
+ do {
size_t chunk = min_t(size_t, n, PAGE_SIZE - off);
- memzero_page(pipe->bufs[idx].page, off, chunk);
- i->idx = idx;
+ memzero_page(pipe->bufs[i_head & p_mask].page, off, chunk);
+ i->head = i_head;
i->iov_offset = off + chunk;
n -= chunk;
- }
+ off = 0;
+ i_head++;
+ } while (n);
i->count -= bytes;
return bytes;
}
@@ -987,20 +1011,26 @@ EXPORT_SYMBOL(iov_iter_copy_from_user_atomic);
static inline void pipe_truncate(struct iov_iter *i)
{
struct pipe_inode_info *pipe = i->pipe;
- if (pipe->nrbufs) {
+ unsigned int p_tail = pipe->tail;
+ unsigned int p_head = pipe->head;
+ unsigned int p_mask = pipe->ring_size - 1;
+
+ if (!pipe_empty(p_head, p_tail)) {
+ struct pipe_buffer *buf;
+ unsigned int i_head = i->head;
size_t off = i->iov_offset;
- int idx = i->idx;
- int nrbufs = (idx - pipe->curbuf) & (pipe->buffers - 1);
+
if (off) {
- pipe->bufs[idx].len = off - pipe->bufs[idx].offset;
- idx = next_idx(idx, pipe);
- nrbufs++;
+ buf = &pipe->bufs[i_head & p_mask];
+ buf->len = off - buf->offset;
+ i_head++;
}
- while (pipe->nrbufs > nrbufs) {
- pipe_buf_release(pipe, &pipe->bufs[idx]);
- idx = next_idx(idx, pipe);
- pipe->nrbufs--;
+ while (p_head != i_head) {
+ p_head--;
+ pipe_buf_release(pipe, &pipe->bufs[p_head & p_mask]);
}
+
+ pipe_commit_write(pipe, p_head);
}
}

@@ -1011,18 +1041,20 @@ static void pipe_advance(struct iov_iter *i, size_t size)
size = i->count;
if (size) {
struct pipe_buffer *buf;
+ unsigned int p_mask = pipe->ring_size - 1;
+ unsigned int i_head = i->head;
size_t off = i->iov_offset, left = size;
- int idx = i->idx;
+
if (off) /* make it relative to the beginning of buffer */
- left += off - pipe->bufs[idx].offset;
+ left += off - pipe->bufs[i_head & p_mask].offset;
while (1) {
- buf = &pipe->bufs[idx];
+ buf = &pipe->bufs[i_head & p_mask];
if (left <= buf->len)
break;
left -= buf->len;
- idx = next_idx(idx, pipe);
+ i_head++;
}
- i->idx = idx;
+ i->head = i_head;
i->iov_offset = buf->offset + left;
}
i->count -= size;
@@ -1053,25 +1085,27 @@ void iov_iter_revert(struct iov_iter *i, size_t unroll)
i->count += unroll;
if (unlikely(iov_iter_is_pipe(i))) {
struct pipe_inode_info *pipe = i->pipe;
- int idx = i->idx;
+ unsigned int p_mask = pipe->ring_size - 1;
+ unsigned int i_head = i->head;
size_t off = i->iov_offset;
while (1) {
- size_t n = off - pipe->bufs[idx].offset;
+ struct pipe_buffer *b = &pipe->bufs[i_head & p_mask];
+ size_t n = off - b->offset;
if (unroll < n) {
off -= unroll;
break;
}
unroll -= n;
- if (!unroll && idx == i->start_idx) {
+ if (!unroll && i_head == i->start_head) {
off = 0;
break;
}
- if (!idx--)
- idx = pipe->buffers - 1;
- off = pipe->bufs[idx].offset + pipe->bufs[idx].len;
+ i_head--;
+ b = &pipe->bufs[i_head & p_mask];
+ off = b->offset + b->len;
}
i->iov_offset = off;
- i->idx = idx;
+ i->head = i_head;
pipe_truncate(i);
return;
}
@@ -1159,13 +1193,13 @@ void iov_iter_pipe(struct iov_iter *i, unsigned int direction,
size_t count)
{
BUG_ON(direction != READ);
- WARN_ON(pipe->nrbufs == pipe->buffers);
+ WARN_ON(pipe_full(pipe->head, pipe->tail, pipe->ring_size));
i->type = ITER_PIPE | READ;
i->pipe = pipe;
- i->idx = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
+ i->head = pipe->head;
i->iov_offset = 0;
i->count = count;
- i->start_idx = i->idx;
+ i->start_head = i->head;
}
EXPORT_SYMBOL(iov_iter_pipe);

@@ -1189,11 +1223,12 @@ EXPORT_SYMBOL(iov_iter_discard);

unsigned long iov_iter_alignment(const struct iov_iter *i)
{
+ unsigned int p_mask = i->pipe->ring_size - 1;
unsigned long res = 0;
size_t size = i->count;

if (unlikely(iov_iter_is_pipe(i))) {
- if (size && i->iov_offset && allocated(&i->pipe->bufs[i->idx]))
+ if (size && i->iov_offset && allocated(&i->pipe->bufs[i->head & p_mask]))
return size | i->iov_offset;
return size;
}
@@ -1231,19 +1266,20 @@ EXPORT_SYMBOL(iov_iter_gap_alignment);
static inline ssize_t __pipe_get_pages(struct iov_iter *i,
size_t maxsize,
struct page **pages,
- int idx,
+ int iter_head,
size_t *start)
{
struct pipe_inode_info *pipe = i->pipe;
- ssize_t n = push_pipe(i, maxsize, &idx, start);
+ unsigned int p_mask = pipe->ring_size - 1;
+ ssize_t n = push_pipe(i, maxsize, &iter_head, start);
if (!n)
return -EFAULT;

maxsize = n;
n += *start;
while (n > 0) {
- get_page(*pages++ = pipe->bufs[idx].page);
- idx = next_idx(idx, pipe);
+ get_page(*pages++ = pipe->bufs[iter_head & p_mask].page);
+ iter_head++;
n -= PAGE_SIZE;
}

@@ -1254,9 +1290,8 @@ static ssize_t pipe_get_pages(struct iov_iter *i,
struct page **pages, size_t maxsize, unsigned maxpages,
size_t *start)
{
- unsigned npages;
+ unsigned int iter_head, npages;
size_t capacity;
- int idx;

if (!maxsize)
return 0;
@@ -1264,12 +1299,12 @@ static ssize_t pipe_get_pages(struct iov_iter *i,
if (!sanity(i))
return -EFAULT;

- data_start(i, &idx, start);
- /* some of this one + all after this one */
- npages = ((i->pipe->curbuf - idx - 1) & (i->pipe->buffers - 1)) + 1;
- capacity = min(npages,maxpages) * PAGE_SIZE - *start;
+ data_start(i, &iter_head, start);
+ /* Amount of free space: some of this one + all after this one */
+ npages = pipe_space_for_user(iter_head, i->pipe->tail, i->pipe);
+ capacity = min(npages, maxpages) * PAGE_SIZE - *start;

- return __pipe_get_pages(i, min(maxsize, capacity), pages, idx, start);
+ return __pipe_get_pages(i, min(maxsize, capacity), pages, iter_head, start);
}

ssize_t iov_iter_get_pages(struct iov_iter *i,
@@ -1323,9 +1358,8 @@ static ssize_t pipe_get_pages_alloc(struct iov_iter *i,
size_t *start)
{
struct page **p;
+ unsigned int iter_head, npages;
ssize_t n;
- int idx;
- int npages;

if (!maxsize)
return 0;
@@ -1333,9 +1367,9 @@ static ssize_t pipe_get_pages_alloc(struct iov_iter *i,
if (!sanity(i))
return -EFAULT;

- data_start(i, &idx, start);
- /* some of this one + all after this one */
- npages = ((i->pipe->curbuf - idx - 1) & (i->pipe->buffers - 1)) + 1;
+ data_start(i, &iter_head, start);
+ /* Amount of free space: some of this one + all after this one */
+ npages = pipe_space_for_user(iter_head, i->pipe->tail, i->pipe);
n = npages * PAGE_SIZE - *start;
if (maxsize > n)
maxsize = n;
@@ -1344,7 +1378,7 @@ static ssize_t pipe_get_pages_alloc(struct iov_iter *i,
p = get_pages_array(npages);
if (!p)
return -ENOMEM;
- n = __pipe_get_pages(i, maxsize, p, idx, start);
+ n = __pipe_get_pages(i, maxsize, p, iter_head, start);
if (n > 0)
*pages = p;
else
@@ -1560,15 +1594,15 @@ int iov_iter_npages(const struct iov_iter *i, int maxpages)

if (unlikely(iov_iter_is_pipe(i))) {
struct pipe_inode_info *pipe = i->pipe;
+ unsigned int iter_head;
size_t off;
- int idx;

if (!sanity(i))
return 0;

- data_start(i, &idx, &off);
+ data_start(i, &iter_head, &off);
/* some of this one + all after this one */
- npages = ((pipe->curbuf - idx - 1) & (pipe->buffers - 1)) + 1;
+ npages = pipe_space_for_user(iter_head, pipe->tail, pipe);
if (npages >= maxpages)
return maxpages;
} else iterate_all_kinds(i, size, v, ({

2019-10-24 14:15:02

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 03/10] Add wake_up_interruptible_sync_poll_locked() [ver #2]

Add a wakeup call for a case whereby the caller already has the waitqueue
spinlock held. This can be used by pipes to alter the ring buffer indices
and issue a wakeup under the same spinlock.

Signed-off-by: David Howells <[email protected]>
---

include/linux/wait.h | 3 +++
kernel/sched/wait.c | 23 +++++++++++++++++++++++
2 files changed, 26 insertions(+)

diff --git a/include/linux/wait.h b/include/linux/wait.h
index bb7676d396cd..3283c8d02137 100644
--- a/include/linux/wait.h
+++ b/include/linux/wait.h
@@ -202,6 +202,7 @@ void __wake_up_locked_key(struct wait_queue_head *wq_head, unsigned int mode, vo
void __wake_up_locked_key_bookmark(struct wait_queue_head *wq_head,
unsigned int mode, void *key, wait_queue_entry_t *bookmark);
void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode, void *key);
+void __wake_up_locked_sync_key(struct wait_queue_head *wq_head, unsigned int mode, void *key);
void __wake_up_locked(struct wait_queue_head *wq_head, unsigned int mode, int nr);
void __wake_up_sync(struct wait_queue_head *wq_head, unsigned int mode);

@@ -229,6 +230,8 @@ void __wake_up_sync(struct wait_queue_head *wq_head, unsigned int mode);
__wake_up(x, TASK_INTERRUPTIBLE, 1, poll_to_key(m))
#define wake_up_interruptible_sync_poll(x, m) \
__wake_up_sync_key((x), TASK_INTERRUPTIBLE, poll_to_key(m))
+#define wake_up_interruptible_sync_poll_locked(x, m) \
+ __wake_up_locked_sync_key((x), TASK_INTERRUPTIBLE, poll_to_key(m))

#define ___wait_cond_timeout(condition) \
({ \
diff --git a/kernel/sched/wait.c b/kernel/sched/wait.c
index b4b52361dab7..ba059fbfc53a 100644
--- a/kernel/sched/wait.c
+++ b/kernel/sched/wait.c
@@ -191,6 +191,29 @@ void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode,
}
EXPORT_SYMBOL_GPL(__wake_up_sync_key);

+/**
+ * __wake_up_locked_sync_key - wake up a thread blocked on a locked waitqueue.
+ * @wq_head: the waitqueue
+ * @mode: which threads
+ * @key: opaque value to be passed to wakeup targets
+ *
+ * The sync wakeup differs in that the waker knows that it will schedule
+ * away soon, so while the target thread will be woken up, it will not
+ * be migrated to another CPU - ie. the two threads are 'synchronized'
+ * with each other. This can prevent needless bouncing between CPUs.
+ *
+ * On UP it can prevent extra preemption.
+ *
+ * If this function wakes up a task, it executes a full memory barrier before
+ * accessing the task state.
+ */
+void __wake_up_locked_sync_key(struct wait_queue_head *wq_head,
+ unsigned int mode, void *key)
+{
+ __wake_up_common(wq_head, mode, 1, WF_SYNC, key, NULL);
+}
+EXPORT_SYMBOL_GPL(__wake_up_locked_sync_key);
+
/*
* __wake_up_sync - see __wake_up_sync_key()
*/

2019-10-24 14:15:38

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 05/10] pipe: Allow pipes to have kernel-reserved slots [ver #2]

Split pipe->ring_size into two numbers:

(1) pipe->ring_size - indicates the hard size of the pipe ring.

(2) pipe->max_usage - indicates the maximum number of pipe ring slots that
userspace orchestrated events can fill.

This allows for a pipe that is both writable by the general kernel
notification facility and by userspace, allowing plenty of ring space for
notifications to be added whilst preventing userspace from being able to
pin too much unswappable kernel space.

Signed-off-by: David Howells <[email protected]>
---

fs/fuse/dev.c | 8 ++++----
fs/pipe.c | 10 ++++++----
fs/splice.c | 26 +++++++++++++-------------
include/linux/pipe_fs_i.h | 6 +++++-
lib/iov_iter.c | 4 ++--
5 files changed, 30 insertions(+), 24 deletions(-)

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 1e4bc27573cc..5ef57a322cb8 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -703,7 +703,7 @@ static int fuse_copy_fill(struct fuse_copy_state *cs)
cs->pipebufs++;
cs->nr_segs--;
} else {
- if (cs->nr_segs >= cs->pipe->ring_size)
+ if (cs->nr_segs >= cs->pipe->max_usage)
return -EIO;

page = alloc_page(GFP_HIGHUSER);
@@ -879,7 +879,7 @@ static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page,
struct pipe_buffer *buf;
int err;

- if (cs->nr_segs >= cs->pipe->ring_size)
+ if (cs->nr_segs >= cs->pipe->max_usage)
return -EIO;

err = unlock_request(cs->req);
@@ -1341,7 +1341,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
if (!fud)
return -EPERM;

- bufs = kvmalloc_array(pipe->ring_size, sizeof(struct pipe_buffer),
+ bufs = kvmalloc_array(pipe->max_usage, sizeof(struct pipe_buffer),
GFP_KERNEL);
if (!bufs)
return -ENOMEM;
@@ -1353,7 +1353,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
if (ret < 0)
goto out;

- if (pipe_occupancy(pipe->head, pipe->tail) + cs.nr_segs > pipe->ring_size) {
+ if (pipe_occupancy(pipe->head, pipe->tail) + cs.nr_segs > pipe->max_usage) {
ret = -EIO;
goto out;
}
diff --git a/fs/pipe.c b/fs/pipe.c
index 8a0806fe12d3..12e47cae9425 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -403,7 +403,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)

tail = pipe->tail;
head = pipe->head;
- max_usage = pipe->ring_size;
+ max_usage = pipe->max_usage;
mask = pipe->ring_size - 1;

/* We try to merge small writes */
@@ -570,7 +570,7 @@ pipe_poll(struct file *filp, poll_table *wait)
}

if (filp->f_mode & FMODE_WRITE) {
- if (!pipe_full(head, tail, pipe->ring_size))
+ if (!pipe_full(head, tail, pipe->max_usage))
mask |= EPOLLOUT | EPOLLWRNORM;
/*
* Most Unices do not set EPOLLERR for FIFOs but on Linux they
@@ -695,6 +695,7 @@ struct pipe_inode_info *alloc_pipe_info(void)
if (pipe->bufs) {
init_waitqueue_head(&pipe->wait);
pipe->r_counter = pipe->w_counter = 1;
+ pipe->max_usage = pipe_bufs;
pipe->ring_size = pipe_bufs;
pipe->user = user;
mutex_init(&pipe->mutex);
@@ -1149,9 +1150,10 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)
kfree(pipe->bufs);
pipe->bufs = bufs;
pipe->ring_size = nr_slots;
+ pipe->max_usage = nr_slots;
pipe->tail = tail;
pipe->head = head;
- return pipe->ring_size * PAGE_SIZE;
+ return pipe->max_usage * PAGE_SIZE;

out_revert_acct:
(void) account_pipe_buffers(pipe->user, nr_slots, pipe->ring_size);
@@ -1184,7 +1186,7 @@ long pipe_fcntl(struct file *file, unsigned int cmd, unsigned long arg)
ret = pipe_set_size(pipe, arg);
break;
case F_GETPIPE_SZ:
- ret = pipe->ring_size * PAGE_SIZE;
+ ret = pipe->max_usage * PAGE_SIZE;
break;
default:
ret = -EINVAL;
diff --git a/fs/splice.c b/fs/splice.c
index bbc025236ff9..03b96eae084b 100644
--- a/fs/splice.c
+++ b/fs/splice.c
@@ -199,7 +199,7 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
goto out;
}

- while (!pipe_full(head, tail, pipe->ring_size)) {
+ while (!pipe_full(head, tail, pipe->max_usage)) {
struct pipe_buffer *buf = &pipe->bufs[head & mask];

buf->page = spd->pages[page_nr];
@@ -239,7 +239,7 @@ ssize_t add_to_pipe(struct pipe_inode_info *pipe, struct pipe_buffer *buf)
if (unlikely(!pipe->readers)) {
send_sig(SIGPIPE, current, 0);
ret = -EPIPE;
- } else if (pipe_full(head, tail, pipe->ring_size)) {
+ } else if (pipe_full(head, tail, pipe->max_usage)) {
ret = -EAGAIN;
} else {
pipe->bufs[head & mask] = *buf;
@@ -257,7 +257,7 @@ EXPORT_SYMBOL(add_to_pipe);
*/
int splice_grow_spd(const struct pipe_inode_info *pipe, struct splice_pipe_desc *spd)
{
- unsigned int max_usage = READ_ONCE(pipe->ring_size);
+ unsigned int max_usage = READ_ONCE(pipe->max_usage);

spd->nr_pages_max = max_usage;
if (max_usage <= PIPE_DEF_BUFFERS)
@@ -381,7 +381,7 @@ static ssize_t default_file_splice_read(struct file *in, loff_t *ppos,
ssize_t res;
int i;

- if (pipe_full(pipe->head, pipe->tail, pipe->ring_size))
+ if (pipe_full(pipe->head, pipe->tail, pipe->max_usage))
return -EAGAIN;

/*
@@ -698,7 +698,7 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
.pos = *ppos,
.u.file = out,
};
- int nbufs = pipe->ring_size;
+ int nbufs = pipe->max_usage;
struct bio_vec *array = kcalloc(nbufs, sizeof(struct bio_vec),
GFP_KERNEL);
ssize_t ret;
@@ -721,9 +721,9 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
if (ret <= 0)
break;

- if (unlikely(nbufs < pipe->ring_size)) {
+ if (unlikely(nbufs < pipe->max_usage)) {
kfree(array);
- nbufs = pipe->ring_size;
+ nbufs = pipe->max_usage;
array = kcalloc(nbufs, sizeof(struct bio_vec),
GFP_KERNEL);
if (!array) {
@@ -963,7 +963,7 @@ ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
loff_t pos = sd->pos, prev_pos = pos;

/* Don't try to read more the pipe has space for. */
- p_space = pipe->ring_size -
+ p_space = pipe->max_usage -
pipe_occupancy(pipe->head, pipe->tail);
read_len = min_t(size_t, len, p_space << PAGE_SHIFT);
ret = do_splice_to(in, &pos, pipe, read_len, flags);
@@ -1090,7 +1090,7 @@ static int wait_for_space(struct pipe_inode_info *pipe, unsigned flags)
send_sig(SIGPIPE, current, 0);
return -EPIPE;
}
- if (!pipe_full(pipe->head, pipe->tail, pipe->ring_size))
+ if (!pipe_full(pipe->head, pipe->tail, pipe->max_usage))
return 0;
if (flags & SPLICE_F_NONBLOCK)
return -EAGAIN;
@@ -1498,13 +1498,13 @@ static int opipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
* Check ->nrbufs without the inode lock first. This function
* is speculative anyways, so missing one is ok.
*/
- if (pipe_full(pipe->head, pipe->tail, pipe->ring_size))
+ if (pipe_full(pipe->head, pipe->tail, pipe->max_usage))
return 0;

ret = 0;
pipe_lock(pipe);

- while (pipe_full(pipe->head, pipe->tail, pipe->ring_size)) {
+ while (pipe_full(pipe->head, pipe->tail, pipe->max_usage)) {
if (!pipe->readers) {
send_sig(SIGPIPE, current, 0);
ret = -EPIPE;
@@ -1584,7 +1584,7 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
* pipe is empty or the output pipe is full.
*/
if (pipe_empty(i_head, i_tail) ||
- pipe_full(o_head, o_tail, opipe->ring_size)) {
+ pipe_full(o_head, o_tail, opipe->max_usage)) {
/* Already processed some buffers, break */
if (ret)
break;
@@ -1706,7 +1706,7 @@ static int link_pipe(struct pipe_inode_info *ipipe,
* output room, break.
*/
if (pipe_empty(i_head, i_tail) ||
- pipe_full(o_head, o_tail, opipe->ring_size))
+ pipe_full(o_head, o_tail, opipe->max_usage))
break;

ibuf = &ipipe->bufs[i_tail & i_mask];
diff --git a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h
index fad096697ff5..90055ff16550 100644
--- a/include/linux/pipe_fs_i.h
+++ b/include/linux/pipe_fs_i.h
@@ -32,6 +32,7 @@ struct pipe_buffer {
* @wait: reader/writer wait point in case of empty/full pipe
* @head: The point of buffer production
* @tail: The point of buffer consumption
+ * @max_usage: The maximum number of slots that may be used in the ring
* @ring_size: total number of buffers (should be a power of 2)
* @tmp_page: cached released page
* @readers: number of current readers of this pipe
@@ -50,6 +51,7 @@ struct pipe_inode_info {
wait_queue_head_t wait;
unsigned int head;
unsigned int tail;
+ unsigned int max_usage;
unsigned int ring_size;
unsigned int readers;
unsigned int writers;
@@ -176,9 +178,11 @@ static inline unsigned int pipe_space_for_user(unsigned int head, unsigned int t
unsigned int p_occupancy, p_space;

p_occupancy = pipe_occupancy(head, tail);
- if (p_occupancy >= pipe->ring_size)
+ if (p_occupancy >= pipe->max_usage)
return 0;
p_space = pipe->ring_size - p_occupancy;
+ if (p_space > pipe->max_usage)
+ p_space = pipe->max_usage;
return p_space;
}

diff --git a/lib/iov_iter.c b/lib/iov_iter.c
index 150a40bdb21a..134737e9d4ee 100644
--- a/lib/iov_iter.c
+++ b/lib/iov_iter.c
@@ -395,7 +395,7 @@ static size_t copy_page_to_iter_pipe(struct page *page, size_t offset, size_t by
i_head++;
buf = &pipe->bufs[i_head & p_mask];
}
- if (pipe_full(i_head, p_tail, pipe->ring_size))
+ if (pipe_full(i_head, p_tail, pipe->max_usage))
return 0;

buf->ops = &page_cache_pipe_buf_ops;
@@ -528,7 +528,7 @@ static size_t push_pipe(struct iov_iter *i, size_t size,
pipe->bufs[iter_head & p_mask].len = PAGE_SIZE;
iter_head++;
}
- while (!pipe_full(iter_head, p_tail, pipe->ring_size)) {
+ while (!pipe_full(iter_head, p_tail, pipe->max_usage)) {
struct pipe_buffer *buf = &pipe->bufs[iter_head & p_mask];
struct page *page = alloc_page(GFP_USER);
if (!page)

2019-10-24 14:17:03

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 06/10] pipe: Advance tail pointer inside of wait spinlock in pipe_read() [ver #2]

Advance the pipe ring tail pointer inside of wait spinlock in pipe_read()
so that the pipe can be written into with kernel notifications from
contexts where pipe->mutex cannot be taken.

Signed-off-by: David Howells <[email protected]>
---

fs/pipe.c | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/fs/pipe.c b/fs/pipe.c
index 12e47cae9425..1274305772fb 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -324,9 +324,14 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)

if (!buf->len) {
pipe_buf_release(pipe, buf);
+ spin_lock_irq(&pipe->wait.lock);
tail++;
pipe_commit_read(pipe, tail);
- do_wakeup = 1;
+ do_wakeup = 0;
+ wake_up_interruptible_sync_poll_locked(
+ &pipe->wait, EPOLLOUT | EPOLLWRNORM);
+ spin_unlock_irq(&pipe->wait.lock);
+ kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
}
total_len -= chars;
if (!total_len)
@@ -358,6 +363,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
if (do_wakeup) {
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLOUT | EPOLLWRNORM);
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
+ do_wakeup = 0;
}
pipe_wait(pipe);
}

2019-10-24 14:17:53

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 08/10] pipe: Rearrange sequence in pipe_write() to preallocate slot [ver #2]

Rearrange the sequence in pipe_write() so that the allocation of the new
buffer, the allocation of a ring slot and the attachment to the ring is
done under the pipe wait spinlock and then the lock is dropped and the
buffer can be filled.

The data copy needs to be done with the spinlock unheld and irqs enabled,
so the lock needs to be dropped first. However, the reader can't progress
as we're holding pipe->mutex.

We also need to drop the lock as that would impact others looking at the
pipe waitqueue, such as poll(), the consumer and a future kernel message
writer.

We just abandon the preallocated slot if we get a copy error. Future
writes may continue it and a future read will eventually recycle it.

Signed-off-by: David Howells <[email protected]>
---

fs/pipe.c | 51 +++++++++++++++++++++++++++++++++------------------
1 file changed, 33 insertions(+), 18 deletions(-)

diff --git a/fs/pipe.c b/fs/pipe.c
index e3a8f10750c9..1bfad2212b95 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -386,7 +386,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
{
struct file *filp = iocb->ki_filp;
struct pipe_inode_info *pipe = filp->private_data;
- unsigned int head, tail, max_usage, mask;
+ unsigned int head, max_usage, mask;
ssize_t ret = 0;
int do_wakeup = 0;
size_t total_len = iov_iter_count(from);
@@ -404,14 +404,13 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
goto out;
}

- tail = pipe->tail;
head = pipe->head;
max_usage = pipe->max_usage;
mask = pipe->ring_size - 1;

/* We try to merge small writes */
chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */
- if (!pipe_empty(head, tail) && chars != 0) {
+ if (!pipe_empty(head, pipe->tail) && chars != 0) {
struct pipe_buffer *buf = &pipe->bufs[(head - 1) & mask];
int offset = buf->offset + buf->len;

@@ -440,8 +439,8 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
break;
}

- tail = pipe->tail;
- if (!pipe_full(head, tail, max_usage)) {
+ head = pipe->head;
+ if (!pipe_full(head, pipe->tail, max_usage)) {
struct pipe_buffer *buf = &pipe->bufs[head & mask];
struct page *page = pipe->tmp_page;
int copied;
@@ -454,40 +453,56 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
}
pipe->tmp_page = page;
}
+
+ /* Allocate a slot in the ring in advance and attach an
+ * empty buffer. If we fault or otherwise fail to use
+ * it, either the reader will consume it or it'll still
+ * be there for the next write.
+ */
+ spin_lock_irq(&pipe->wait.lock);
+
+ head = pipe->head;
+ pipe_commit_write(pipe, head + 1);
+
/* Always wake up, even if the copy fails. Otherwise
* we lock up (O_NONBLOCK-)readers that sleep due to
* syscall merging.
* FIXME! Is this really true?
*/
- do_wakeup = 1;
- copied = copy_page_from_iter(page, 0, PAGE_SIZE, from);
- if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) {
- if (!ret)
- ret = -EFAULT;
- break;
- }
- ret += copied;
+ wake_up_interruptible_sync_poll_locked(
+ &pipe->wait, EPOLLIN | EPOLLRDNORM);
+
+ spin_unlock_irq(&pipe->wait.lock);
+ kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);

/* Insert it into the buffer array */
+ buf = &pipe->bufs[head & mask];
buf->page = page;
buf->ops = &anon_pipe_buf_ops;
buf->offset = 0;
- buf->len = copied;
+ buf->len = 0;
buf->flags = 0;
if (is_packetized(filp)) {
buf->ops = &packet_pipe_buf_ops;
buf->flags = PIPE_BUF_FLAG_PACKET;
}
-
- head++;
- pipe_commit_write(pipe, head);
pipe->tmp_page = NULL;

+ copied = copy_page_from_iter(page, 0, PAGE_SIZE, from);
+ if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) {
+ if (!ret)
+ ret = -EFAULT;
+ break;
+ }
+ ret += copied;
+ buf->offset = 0;
+ buf->len = copied;
+
if (!iov_iter_count(from))
break;
}

- if (!pipe_full(head, tail, max_usage))
+ if (!pipe_full(head, pipe->tail, max_usage))
continue;

/* Wait for buffer space to become available. */

2019-10-24 14:17:55

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 09/10] pipe: Remove redundant wakeup from pipe_write() [ver #2]

Remove a redundant wakeup from pipe_write().

Signed-off-by: David Howells <[email protected]>
---

fs/pipe.c | 5 -----
1 file changed, 5 deletions(-)

diff --git a/fs/pipe.c b/fs/pipe.c
index 1bfad2212b95..3df93990dd9d 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -516,11 +516,6 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
ret = -ERESTARTSYS;
break;
}
- if (do_wakeup) {
- wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
- kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
- do_wakeup = 0;
- }
pipe->waiting_writers++;
pipe_wait(pipe);
pipe->waiting_writers--;

2019-10-25 09:34:33

by David Howells

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

I've pushed to git a new version that fixes an incomplete conversion in
pipe_zero(), ports the powerpc virtio_console driver and fixes a comment in
splice.

David

2019-10-25 13:56:15

by Peter Zijlstra

[permalink] [raw]
Subject: Re: [RFC PATCH 00/10] pipe: Notification queue preparation [ver #2]

On Wed, Oct 23, 2019 at 09:17:04PM +0100, David Howells wrote:

> (1) It removes the nr_exclusive argument from __wake_up_sync_key() as this
> is always 1. This prepares for step 2.
>
> (2) Adds wake_up_interruptible_sync_poll_locked() so that poll can be
> woken up from a function that's holding the poll waitqueue spinlock.

> include/linux/wait.h | 11 +-
> kernel/sched/wait.c | 37 ++++--
>

Acked-by: Peter Zijlstra (Intel) <[email protected]>

2019-10-25 14:22:53

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

pipe: Add fsync() support

The keyrings testsuite needs the ability to wait for all the outstanding
notifications in the queue to have been processed so that it can then go
through them to find out whether the notifications it expected have been
emitted.

Implement fsync() support for pipes to provide this. The tailmost buffer
at the point of calling is marked and fsync adds itself to the list of
waiters, noting the tail position to be waited for and marking the buffer
as no longer mergeable. Then when the buffer is consumed, if the flag is
set, any matching waiters are woken up.

Signed-off-by: David Howells <[email protected]>
---
fs/fuse/dev.c | 1
fs/pipe.c | 61 ++++++++++++++++++++++++++++++++++++++++++++++
fs/splice.c | 3 ++
include/linux/pipe_fs_i.h | 22 ++++++++++++++++
lib/iov_iter.c | 2 -
5 files changed, 88 insertions(+), 1 deletion(-)


diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 5ef57a322cb8..9617a35579cb 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -1983,6 +1983,7 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
if (rem >= ibuf->len) {
*obuf = *ibuf;
ibuf->ops = NULL;
+ pipe_wake_fsync(pipe, ibuf, tail);
tail++;
pipe_commit_read(pipe, tail);
} else {
diff --git a/fs/pipe.c b/fs/pipe.c
index 6a982a88f658..8e5fd7314be1 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -30,6 +30,12 @@

#include "internal.h"

+struct pipe_fsync {
+ struct list_head link; /* Link in pipe->fsync */
+ struct completion done;
+ unsigned int tail; /* The buffer being waited for */
+};
+
/*
* The max size that a non-root user is allowed to grow the pipe. Can
* be set by root in /proc/sys/fs/pipe-max-size
@@ -269,6 +275,58 @@ static bool pipe_buf_can_merge(struct pipe_buffer *buf)
return buf->ops == &anon_pipe_buf_ops;
}

+/*
+ * Wait for all the data currently in the pipe to be consumed.
+ */
+static int pipe_fsync(struct file *file, loff_t a, loff_t b, int datasync)
+{
+ struct pipe_inode_info *pipe = file->private_data;
+ struct pipe_buffer *buf;
+ struct pipe_fsync fsync;
+ unsigned int head, tail, mask;
+
+ pipe_lock(pipe);
+
+ head = pipe->head;
+ tail = pipe->tail;
+ mask = pipe->ring_size - 1;
+
+ if (pipe_empty(head, tail)) {
+ pipe_unlock(pipe);
+ return 0;
+ }
+
+ init_completion(&fsync.done);
+ fsync.tail = tail;
+ buf = &pipe->bufs[tail & mask];
+ buf->flags |= PIPE_BUF_FLAG_FSYNC;
+ pipe_buf_mark_unmergeable(buf);
+ list_add_tail(&fsync.link, &pipe->fsync);
+ pipe_unlock(pipe);
+
+ if (wait_for_completion_interruptible(&fsync.done) < 0) {
+ pipe_lock(pipe);
+ list_del(&fsync.link);
+ pipe_unlock(pipe);
+ return -EINTR;
+ }
+
+ return 0;
+}
+
+void __pipe_wake_fsync(struct pipe_inode_info *pipe, unsigned int tail)
+{
+ struct pipe_fsync *fsync, *p;
+
+ list_for_each_entry_safe(fsync, p, &pipe->fsync, link) {
+ if (fsync->tail == tail) {
+ list_del_init(&fsync->link);
+ complete(&fsync->done);
+ }
+ }
+}
+EXPORT_SYMBOL(__pipe_wake_fsync);
+
static ssize_t
pipe_read(struct kiocb *iocb, struct iov_iter *to)
{
@@ -325,6 +383,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
if (!buf->len) {
pipe_buf_release(pipe, buf);
spin_lock_irq(&pipe->wait.lock);
+ pipe_wake_fsync(pipe, buf, tail);
tail++;
pipe_commit_read(pipe, tail);
do_wakeup = 1;
@@ -717,6 +776,7 @@ struct pipe_inode_info *alloc_pipe_info(void)
pipe->ring_size = pipe_bufs;
pipe->user = user;
mutex_init(&pipe->mutex);
+ INIT_LIST_HEAD(&pipe->fsync);
return pipe;
}

@@ -1060,6 +1120,7 @@ const struct file_operations pipefifo_fops = {
.llseek = no_llseek,
.read_iter = pipe_read,
.write_iter = pipe_write,
+ .fsync = pipe_fsync,
.poll = pipe_poll,
.unlocked_ioctl = pipe_ioctl,
.release = pipe_release,
diff --git a/fs/splice.c b/fs/splice.c
index 3f72bc31b6ec..e106367e1be6 100644
--- a/fs/splice.c
+++ b/fs/splice.c
@@ -523,6 +523,7 @@ static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des

if (!buf->len) {
pipe_buf_release(pipe, buf);
+ pipe_wake_fsync(pipe, buf, tail);
tail++;
pipe_commit_read(pipe, tail);
if (pipe->files)
@@ -771,6 +772,7 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
ret -= buf->len;
buf->len = 0;
pipe_buf_release(pipe, buf);
+ pipe_wake_fsync(pipe, buf, tail);
tail++;
pipe_commit_read(pipe, tail);
if (pipe->files)
@@ -1613,6 +1615,7 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
*/
*obuf = *ibuf;
ibuf->ops = NULL;
+ pipe_wake_fsync(ipipe, ibuf, i_tail);
i_tail++;
pipe_commit_read(ipipe, i_tail);
input_wakeup = true;
diff --git a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h
index 90055ff16550..1a3027089558 100644
--- a/include/linux/pipe_fs_i.h
+++ b/include/linux/pipe_fs_i.h
@@ -8,6 +8,7 @@
#define PIPE_BUF_FLAG_ATOMIC 0x02 /* was atomically mapped */
#define PIPE_BUF_FLAG_GIFT 0x04 /* page is a gift */
#define PIPE_BUF_FLAG_PACKET 0x08 /* read() as a packet */
+#define PIPE_BUF_FLAG_FSYNC 0x10 /* fsync() is waiting for this buffer to die */

/**
* struct pipe_buffer - a linux kernel pipe buffer
@@ -43,6 +44,7 @@ struct pipe_buffer {
* @w_counter: writer counter
* @fasync_readers: reader side fasync
* @fasync_writers: writer side fasync
+ * @fsync: Waiting fsyncs
* @bufs: the circular array of pipe buffers
* @user: the user who created this pipe
**/
@@ -62,6 +64,7 @@ struct pipe_inode_info {
struct page *tmp_page;
struct fasync_struct *fasync_readers;
struct fasync_struct *fasync_writers;
+ struct list_head fsync;
struct pipe_buffer *bufs;
struct user_struct *user;
};
@@ -268,6 +271,25 @@ extern const struct pipe_buf_operations nosteal_pipe_buf_ops;
long pipe_fcntl(struct file *, unsigned int, unsigned long arg);
struct pipe_inode_info *get_pipe_info(struct file *file);

+void __pipe_wake_fsync(struct pipe_inode_info *pipe, unsigned int tail);
+
+/**
+ * pipe_wake_fsync - Wake up anyone waiting with fsync for this point
+ * @pipe: The pipe that owns the buffer
+ * @buf: The pipe buffer in question
+ * @tail: The index in the ring of the buffer
+ *
+ * Check to see if anyone is waiting for the pipe ring to clear up to and
+ * including this buffer, and, if they are, wake them up.
+ */
+static inline void pipe_wake_fsync(struct pipe_inode_info *pipe,
+ struct pipe_buffer *buf,
+ unsigned int tail)
+{
+ if (unlikely(buf->flags & PIPE_BUF_FLAG_FSYNC))
+ __pipe_wake_fsync(pipe, tail);
+}
+
int create_pipe_files(struct file **, int);
unsigned int round_pipe_size(unsigned long size);

diff --git a/lib/iov_iter.c b/lib/iov_iter.c
index e22f4e283f6d..38d52524cd21 100644
--- a/lib/iov_iter.c
+++ b/lib/iov_iter.c
@@ -404,7 +404,7 @@ static size_t copy_page_to_iter_pipe(struct page *page, size_t offset, size_t by
buf->offset = offset;
buf->len = bytes;

- pipe_commit_read(pipe, i_head);
+ pipe_commit_write(pipe, i_head);
i->iov_offset = offset + bytes;
i->head = i_head;
out:

2019-10-25 19:08:14

by Linus Torvalds

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

On Thu, Oct 24, 2019 at 12:57 PM David Howells <[email protected]> wrote:
>
> pipe: Add fsync() support
>
> The keyrings testsuite needs the ability to wait for all the outstanding
> notifications in the queue to have been processed so that it can then go
> through them to find out whether the notifications it expected have been
> emitted.

Can't you just do

ioctl(fd, FIONREAD, &count);

in a loop instead? "No paperwork. Just sprinkle some msleep() crack on
him, and let's get out of here"

Linus

2019-10-25 22:16:52

by David Howells

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

Linus Torvalds <[email protected]> wrote:

> > The keyrings testsuite needs the ability to wait for all the outstanding
> > notifications in the queue to have been processed so that it can then go
> > through them to find out whether the notifications it expected have been
> > emitted.
>
> Can't you just do
>
> ioctl(fd, FIONREAD, &count);
>
> in a loop instead? "No paperwork. Just sprinkle some msleep() crack on
> him, and let's get out of here"

Using FIONREAD like this means that I would have to quiesce the tests in order
to sync up. For the moment that's fine, but at some point I would like to be
able to stress test the system by running tests in parallel against the same
keyring. Each test needs to check with the monitor whether its keys have
generated the appropriate notifications against a backdrop of events being
continuously generated by other tests.

I can hold this patch for now. Let me see if I can come up with a better way
to do it. Maybe it can be done by dead reckoning, holding up until either
we've counted out a complete ring-full of notifications or read() has come up
empty.

David

2019-10-26 15:00:11

by kernel test robot

[permalink] [raw]
Subject: [pipe] 6567a02d20: BUG:kernel_NULL_pointer_dereference,address

FYI, we noticed the following commit (built with gcc-7):

commit: 6567a02d20732ad1e4f5f193f2dd59c467209a18 ("[RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]")
url: https://github.com/0day-ci/linux/commits/David-Howells/pipe-Notification-queue-preparation-ver-2/20191026-015701


in testcase: boot

on test machine: qemu-system-x86_64 -enable-kvm -cpu SandyBridge -smp 2 -m 8G

caused below changes (please refer to attached dmesg/kmsg for entire log/backtrace):


+---------------------------------------------+------------+------------+
| | 485a2d006a | 6567a02d20 |
+---------------------------------------------+------------+------------+
| boot_successes | 4 | 0 |
| boot_failures | 0 | 6 |
| BUG:kernel_NULL_pointer_dereference,address | 0 | 6 |
| Oops:#[##] | 0 | 6 |
| RIP:get_page | 0 | 6 |
| Kernel_panic-not_syncing:Fatal_exception | 0 | 6 |
+---------------------------------------------+------------+------------+


If you fix the issue, kindly add following tag
Reported-by: kernel test robot <[email protected]>


[ 4.869805] BUG: kernel NULL pointer dereference, address: 0000000000000008
[ 4.871685] #PF: supervisor read access in kernel mode
[ 4.873154] #PF: error_code(0x0000) - not-present page
[ 4.874705] PGD 800000021f014067 P4D 800000021f014067 PUD 21f012067 PMD 0
[ 4.876481] Oops: 0000 [#1] SMP PTI
[ 4.877769] CPU: 1 PID: 1793 Comm: cat Not tainted 5.4.0-rc4-00108-g6567a02d20732 #1
[ 4.880212] Hardware name: QEMU Standard PC (i440FX + PIIX, 1996), BIOS 1.10.2-1 04/01/2014
[ 4.882767] RIP: 0010:get_page+0x0/0x2a
[ 4.884038] Code: 44 89 28 48 8b 4c 24 48 65 48 33 0c 25 28 00 00 00 48 89 e8 74 05 e8 3e 05 cb ff 48 83 c4 50 5b 5d 41 5c 41 5d 41 5e 41 5f c3 <48> 8b 47 08 a8 01 74 04 48 8d 78 ff 8b 47 34 83 c0 7f 83 f8 7f 77
[ 4.890976] RSP: 0000:ffffc900000d7b70 EFLAGS: 00010286
[ 4.892474] RAX: 0000000000010000 RBX: ffff88821f66d090 RCX: 0000000000000000
[ 4.894293] RDX: ffff88821f6ef828 RSI: 0000000000000000 RDI: 0000000000000000
[ 4.896127] RBP: 000000000000000f R08: ffffc900000d7c48 R09: 0000000000240000
[ 4.898021] R10: ffffc900000d7b84 R11: 0000000000000000 R12: 000000000000e000
[ 4.899905] R13: ffff88821f45b240 R14: ffffc900000d7c40 R15: 0000000000010000
[ 4.901779] FS: 0000000000000000(0000) GS:ffff88823fd00000(0063) knlGS:00000000f7fd3de4
[ 4.904331] CS: 0010 DS: 002b ES: 002b CR0: 0000000080050033
[ 4.905912] CR2: 0000000000000008 CR3: 000000021f366000 CR4: 00000000000406e0
[ 4.907765] Call Trace:
[ 4.908816] __pipe_get_pages+0x6a/0x86
[ 4.910087] iov_iter_get_pages_alloc+0xdc/0x380
[ 4.911484] ? ___might_sleep+0x3b/0x144
[ 4.912728] default_file_splice_read+0xa5/0x28a
[ 4.914178] ? ___might_sleep+0x3b/0x144
[ 4.915467] ? ___might_sleep+0x3b/0x144
[ 4.916747] ? _cond_resched+0x25/0x29
[ 4.917990] ? get_page_from_freelist+0x864/0xb3d
[ 4.919411] ? fast_dput+0x25/0x82
[ 4.920601] ? ___might_sleep+0x3b/0x144
[ 4.921877] ? _cond_resched+0x25/0x29
[ 4.923159] ? slab_pre_alloc_hook+0x35/0x61
[ 4.924493] ? __kmalloc+0x132/0x141
[ 4.925680] ? alloc_pipe_info+0xd7/0x15c
[ 4.926983] ? splice_direct_to_actor+0xef/0x1c8
[ 4.928380] splice_direct_to_actor+0xef/0x1c8
[ 4.929740] ? generic_file_splice_read+0x171/0x171
[ 4.931236] do_splice_direct+0x99/0xc2
[ 4.932507] do_sendfile+0x175/0x23f
[ 4.933727] __do_sys_sendfile64+0x8e/0xb2
[ 4.935044] do_int80_syscall_32+0x50/0x5d
[ 4.936342] entry_INT80_compat+0x82/0x90
[ 4.937612] Modules linked in:
[ 4.938739] CR2: 0000000000000008
[ 4.939973] ---[ end trace a2dd9b34228ecd09 ]---


To reproduce:

# build kernel
cd linux
cp config-5.4.0-rc4-00108-g6567a02d20732 .config
make HOSTCC=gcc-7 CC=gcc-7 ARCH=x86_64 olddefconfig prepare modules_prepare bzImage

git clone https://github.com/intel/lkp-tests.git
cd lkp-tests
bin/lkp qemu -k <bzImage> job-script # job-script is attached in this email



Thanks,
lkp


Attachments:
(No filename) (4.31 kB)
config-5.4.0-rc4-00108-g6567a02d20732 (115.74 kB)
job-script (4.77 kB)
dmesg.xz (12.82 kB)
Download all attachments

2019-10-27 14:05:15

by Linus Torvalds

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

This still has signs of that earlier series:

On Wed, Oct 23, 2019 at 4:17 PM David Howells <[email protected]> wrote:
>
> if (rem >= ibuf->len) {
> *obuf = *ibuf;
> ibuf->ops = NULL;
> - pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
> - pipe->nrbufs--;
> + tail++;
> + pipe_commit_read(pipe, tail);
> } else {
> if (!pipe_buf_get(pipe, ibuf))
> goto out_free;

with those odd "pipe_commit_read/write()" helpers.

They make no sense, and they don't make things more legible.

It's shorter and more obvious to just write

pipe->head = head;

than it is to write

pipe_commit_write(pipe, head);

Even when the addition of the notifications, it's all under the
pipe->wait.lock, so it's all just regular assignments.

Now, if at some point it starts doing fancy lockless things, at _that_
point the updates might become more complex, but that's a potential
future thing that wouldn't be relevant for a while, and isn't a reason
to make the code more obscure now.

Hmm?

Linus

2019-10-27 16:54:44

by Christoph Hellwig

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

On Thu, Oct 24, 2019 at 05:57:32PM +0100, David Howells wrote:
> pipe: Add fsync() support
>
> The keyrings testsuite needs the ability to wait for all the outstanding
> notifications in the queue to have been processed so that it can then go
> through them to find out whether the notifications it expected have been
> emitted.
>
> Implement fsync() support for pipes to provide this. The tailmost buffer
> at the point of calling is marked and fsync adds itself to the list of
> waiters, noting the tail position to be waited for and marking the buffer
> as no longer mergeable. Then when the buffer is consumed, if the flag is
> set, any matching waiters are woken up.

I am _really_ worried about overloading fsync for this behavior. fsync
hasn't done anything for 50 years, and suddenly adding any action
is not helpful. If you can't use FIONREAD please add a new ioctls
instead, and document it properly.

2019-10-27 17:00:26

by Konstantin Khlebnikov

[permalink] [raw]
Subject: Re: [RFC PATCH 07/10] pipe: Conditionalise wakeup in pipe_read() [ver #2]

On 23/10/2019 23.18, David Howells wrote:
> Only do a wakeup in pipe_read() if we made space in a completely full
> buffer. The producer shouldn't be waiting on pipe->wait otherwise.

We could go further and wakeup writer only when at least half of buffer is empty.
This gives better batching and reduces rate of context switches.

https://lore.kernel.org/lkml/157219118016.7078.16223055699799396042.stgit@buzz/T/#u

>
> Signed-off-by: David Howells <[email protected]>
> ---
>
> fs/pipe.c | 15 ++++++---------
> 1 file changed, 6 insertions(+), 9 deletions(-)
>
> diff --git a/fs/pipe.c b/fs/pipe.c
> index 1274305772fb..e3a8f10750c9 100644
> --- a/fs/pipe.c
> +++ b/fs/pipe.c
> @@ -327,11 +327,13 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
> spin_lock_irq(&pipe->wait.lock);
> tail++;
> pipe_commit_read(pipe, tail);
> - do_wakeup = 0;
> - wake_up_interruptible_sync_poll_locked(
> - &pipe->wait, EPOLLOUT | EPOLLWRNORM);
> + do_wakeup = 1;
> + if (head - (tail - 1) == pipe->max_usage)
> + wake_up_interruptible_sync_poll_locked(
> + &pipe->wait, EPOLLOUT | EPOLLWRNORM);
> spin_unlock_irq(&pipe->wait.lock);
> - kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
> + if (head - (tail - 1) == pipe->max_usage)
> + kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
> }
> total_len -= chars;
> if (!total_len)
> @@ -360,11 +362,6 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
> ret = -ERESTARTSYS;
> break;
> }
> - if (do_wakeup) {
> - wake_up_interruptible_sync_poll(&pipe->wait, EPOLLOUT | EPOLLWRNORM);
> - kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
> - do_wakeup = 0;
> - }
> pipe_wait(pipe);
> }
> __pipe_unlock(pipe);
>
>

2019-10-27 18:23:47

by Konstantin Khlebnikov

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

On 24/10/2019 19.57, David Howells wrote:
> pipe: Add fsync() support
>
> The keyrings testsuite needs the ability to wait for all the outstanding
> notifications in the queue to have been processed so that it can then go
> through them to find out whether the notifications it expected have been
> emitted.

Similar synchronization is required for reusing memory after vmsplice()?
I don't see other way how sender could safely change these pages.

>
> Implement fsync() support for pipes to provide this. The tailmost buffer
> at the point of calling is marked and fsync adds itself to the list of
> waiters, noting the tail position to be waited for and marking the buffer
> as no longer mergeable. Then when the buffer is consumed, if the flag is
> set, any matching waiters are woken up.
>
> Signed-off-by: David Howells <[email protected]>
> ---
> fs/fuse/dev.c | 1
> fs/pipe.c | 61 ++++++++++++++++++++++++++++++++++++++++++++++
> fs/splice.c | 3 ++
> include/linux/pipe_fs_i.h | 22 ++++++++++++++++
> lib/iov_iter.c | 2 -
> 5 files changed, 88 insertions(+), 1 deletion(-)
>
>
> diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
> index 5ef57a322cb8..9617a35579cb 100644
> --- a/fs/fuse/dev.c
> +++ b/fs/fuse/dev.c
> @@ -1983,6 +1983,7 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
> if (rem >= ibuf->len) {
> *obuf = *ibuf;
> ibuf->ops = NULL;
> + pipe_wake_fsync(pipe, ibuf, tail);
> tail++;
> pipe_commit_read(pipe, tail);
> } else {
> diff --git a/fs/pipe.c b/fs/pipe.c
> index 6a982a88f658..8e5fd7314be1 100644
> --- a/fs/pipe.c
> +++ b/fs/pipe.c
> @@ -30,6 +30,12 @@
>
> #include "internal.h"
>
> +struct pipe_fsync {
> + struct list_head link; /* Link in pipe->fsync */
> + struct completion done;
> + unsigned int tail; /* The buffer being waited for */
> +};
> +
> /*
> * The max size that a non-root user is allowed to grow the pipe. Can
> * be set by root in /proc/sys/fs/pipe-max-size
> @@ -269,6 +275,58 @@ static bool pipe_buf_can_merge(struct pipe_buffer *buf)
> return buf->ops == &anon_pipe_buf_ops;
> }
>
> +/*
> + * Wait for all the data currently in the pipe to be consumed.
> + */
> +static int pipe_fsync(struct file *file, loff_t a, loff_t b, int datasync)
> +{
> + struct pipe_inode_info *pipe = file->private_data;
> + struct pipe_buffer *buf;
> + struct pipe_fsync fsync;
> + unsigned int head, tail, mask;
> +
> + pipe_lock(pipe);
> +
> + head = pipe->head;
> + tail = pipe->tail;
> + mask = pipe->ring_size - 1;
> +
> + if (pipe_empty(head, tail)) {
> + pipe_unlock(pipe);
> + return 0;
> + }
> +
> + init_completion(&fsync.done);
> + fsync.tail = tail;
> + buf = &pipe->bufs[tail & mask];
> + buf->flags |= PIPE_BUF_FLAG_FSYNC;
> + pipe_buf_mark_unmergeable(buf);
> + list_add_tail(&fsync.link, &pipe->fsync);
> + pipe_unlock(pipe);
> +
> + if (wait_for_completion_interruptible(&fsync.done) < 0) {
> + pipe_lock(pipe);
> + list_del(&fsync.link);
> + pipe_unlock(pipe);
> + return -EINTR;
> + }
> +
> + return 0;
> +}
> +
> +void __pipe_wake_fsync(struct pipe_inode_info *pipe, unsigned int tail)
> +{
> + struct pipe_fsync *fsync, *p;
> +
> + list_for_each_entry_safe(fsync, p, &pipe->fsync, link) {
> + if (fsync->tail == tail) {
> + list_del_init(&fsync->link);
> + complete(&fsync->done);
> + }
> + }
> +}
> +EXPORT_SYMBOL(__pipe_wake_fsync);
> +
> static ssize_t
> pipe_read(struct kiocb *iocb, struct iov_iter *to)
> {
> @@ -325,6 +383,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
> if (!buf->len) {
> pipe_buf_release(pipe, buf);
> spin_lock_irq(&pipe->wait.lock);
> + pipe_wake_fsync(pipe, buf, tail);
> tail++;
> pipe_commit_read(pipe, tail);
> do_wakeup = 1;
> @@ -717,6 +776,7 @@ struct pipe_inode_info *alloc_pipe_info(void)
> pipe->ring_size = pipe_bufs;
> pipe->user = user;
> mutex_init(&pipe->mutex);
> + INIT_LIST_HEAD(&pipe->fsync);
> return pipe;
> }
>
> @@ -1060,6 +1120,7 @@ const struct file_operations pipefifo_fops = {
> .llseek = no_llseek,
> .read_iter = pipe_read,
> .write_iter = pipe_write,
> + .fsync = pipe_fsync,
> .poll = pipe_poll,
> .unlocked_ioctl = pipe_ioctl,
> .release = pipe_release,
> diff --git a/fs/splice.c b/fs/splice.c
> index 3f72bc31b6ec..e106367e1be6 100644
> --- a/fs/splice.c
> +++ b/fs/splice.c
> @@ -523,6 +523,7 @@ static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des
>
> if (!buf->len) {
> pipe_buf_release(pipe, buf);
> + pipe_wake_fsync(pipe, buf, tail);
> tail++;
> pipe_commit_read(pipe, tail);
> if (pipe->files)
> @@ -771,6 +772,7 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
> ret -= buf->len;
> buf->len = 0;
> pipe_buf_release(pipe, buf);
> + pipe_wake_fsync(pipe, buf, tail);
> tail++;
> pipe_commit_read(pipe, tail);
> if (pipe->files)
> @@ -1613,6 +1615,7 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
> */
> *obuf = *ibuf;
> ibuf->ops = NULL;
> + pipe_wake_fsync(ipipe, ibuf, i_tail);
> i_tail++;
> pipe_commit_read(ipipe, i_tail);
> input_wakeup = true;
> diff --git a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h
> index 90055ff16550..1a3027089558 100644
> --- a/include/linux/pipe_fs_i.h
> +++ b/include/linux/pipe_fs_i.h
> @@ -8,6 +8,7 @@
> #define PIPE_BUF_FLAG_ATOMIC 0x02 /* was atomically mapped */
> #define PIPE_BUF_FLAG_GIFT 0x04 /* page is a gift */
> #define PIPE_BUF_FLAG_PACKET 0x08 /* read() as a packet */
> +#define PIPE_BUF_FLAG_FSYNC 0x10 /* fsync() is waiting for this buffer to die */
>
> /**
> * struct pipe_buffer - a linux kernel pipe buffer
> @@ -43,6 +44,7 @@ struct pipe_buffer {
> * @w_counter: writer counter
> * @fasync_readers: reader side fasync
> * @fasync_writers: writer side fasync
> + * @fsync: Waiting fsyncs
> * @bufs: the circular array of pipe buffers
> * @user: the user who created this pipe
> **/
> @@ -62,6 +64,7 @@ struct pipe_inode_info {
> struct page *tmp_page;
> struct fasync_struct *fasync_readers;
> struct fasync_struct *fasync_writers;
> + struct list_head fsync;
> struct pipe_buffer *bufs;
> struct user_struct *user;
> };
> @@ -268,6 +271,25 @@ extern const struct pipe_buf_operations nosteal_pipe_buf_ops;
> long pipe_fcntl(struct file *, unsigned int, unsigned long arg);
> struct pipe_inode_info *get_pipe_info(struct file *file);
>
> +void __pipe_wake_fsync(struct pipe_inode_info *pipe, unsigned int tail);
> +
> +/**
> + * pipe_wake_fsync - Wake up anyone waiting with fsync for this point
> + * @pipe: The pipe that owns the buffer
> + * @buf: The pipe buffer in question
> + * @tail: The index in the ring of the buffer
> + *
> + * Check to see if anyone is waiting for the pipe ring to clear up to and
> + * including this buffer, and, if they are, wake them up.
> + */
> +static inline void pipe_wake_fsync(struct pipe_inode_info *pipe,
> + struct pipe_buffer *buf,
> + unsigned int tail)
> +{
> + if (unlikely(buf->flags & PIPE_BUF_FLAG_FSYNC))
> + __pipe_wake_fsync(pipe, tail);
> +}
> +
> int create_pipe_files(struct file **, int);
> unsigned int round_pipe_size(unsigned long size);
>
> diff --git a/lib/iov_iter.c b/lib/iov_iter.c
> index e22f4e283f6d..38d52524cd21 100644
> --- a/lib/iov_iter.c
> +++ b/lib/iov_iter.c
> @@ -404,7 +404,7 @@ static size_t copy_page_to_iter_pipe(struct page *page, size_t offset, size_t by
> buf->offset = offset;
> buf->len = bytes;
>
> - pipe_commit_read(pipe, i_head);
> + pipe_commit_write(pipe, i_head);
> i->iov_offset = offset + bytes;
> i->head = i_head;
> out:
>
>

2019-10-30 16:20:32

by Ilya Dryomov

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On Thu, Oct 24, 2019 at 11:49 AM David Howells <[email protected]> wrote:
>
> Convert pipes to use head and tail pointers for the buffer ring rather than
> pointer and length as the latter requires two atomic ops to update (or a
> combined op) whereas the former only requires one.
>
> (1) The head pointer is the point at which production occurs and points to
> the slot in which the next buffer will be placed. This is equivalent
> to pipe->curbuf + pipe->nrbufs.
>
> The head pointer belongs to the write-side.
>
> (2) The tail pointer is the point at which consumption occurs. It points
> to the next slot to be consumed. This is equivalent to pipe->curbuf.
>
> The tail pointer belongs to the read-side.
>
> (3) head and tail are allowed to run to UINT_MAX and wrap naturally. They
> are only masked off when the array is being accessed, e.g.:
>
> pipe->bufs[head & mask]
>
> This means that it is not necessary to have a dead slot in the ring as
> head == tail isn't ambiguous.
>
> (4) The ring is empty if "head == tail".
>
> A helper, pipe_empty(), is provided for this.
>
> (5) The occupancy of the ring is "head - tail".
>
> A helper, pipe_occupancy(), is provided for this.
>
> (6) The number of free slots in the ring is "pipe->ring_size - occupancy".
>
> A helper, pipe_space_for_user() is provided to indicate how many slots
> userspace may use.
>
> (7) The ring is full if "head - tail >= pipe->ring_size".
>
> A helper, pipe_full(), is provided for this.
>
> Signed-off-by: David Howells <[email protected]>
> ---
>
> fs/fuse/dev.c | 31 +++--
> fs/pipe.c | 169 ++++++++++++++++-------------
> fs/splice.c | 188 ++++++++++++++++++++------------
> include/linux/pipe_fs_i.h | 86 ++++++++++++++-
> include/linux/uio.h | 4 -
> lib/iov_iter.c | 266 +++++++++++++++++++++++++--------------------
> 6 files changed, 464 insertions(+), 280 deletions(-)
>
> diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
> index dadd617d826c..1e4bc27573cc 100644
> --- a/fs/fuse/dev.c
> +++ b/fs/fuse/dev.c
> @@ -703,7 +703,7 @@ static int fuse_copy_fill(struct fuse_copy_state *cs)
> cs->pipebufs++;
> cs->nr_segs--;
> } else {
> - if (cs->nr_segs == cs->pipe->buffers)
> + if (cs->nr_segs >= cs->pipe->ring_size)
> return -EIO;
>
> page = alloc_page(GFP_HIGHUSER);
> @@ -879,7 +879,7 @@ static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page,
> struct pipe_buffer *buf;
> int err;
>
> - if (cs->nr_segs == cs->pipe->buffers)
> + if (cs->nr_segs >= cs->pipe->ring_size)
> return -EIO;
>
> err = unlock_request(cs->req);
> @@ -1341,7 +1341,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
> if (!fud)
> return -EPERM;
>
> - bufs = kvmalloc_array(pipe->buffers, sizeof(struct pipe_buffer),
> + bufs = kvmalloc_array(pipe->ring_size, sizeof(struct pipe_buffer),
> GFP_KERNEL);
> if (!bufs)
> return -ENOMEM;
> @@ -1353,7 +1353,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
> if (ret < 0)
> goto out;
>
> - if (pipe->nrbufs + cs.nr_segs > pipe->buffers) {
> + if (pipe_occupancy(pipe->head, pipe->tail) + cs.nr_segs > pipe->ring_size) {
> ret = -EIO;
> goto out;
> }
> @@ -1935,6 +1935,7 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
> struct file *out, loff_t *ppos,
> size_t len, unsigned int flags)
> {
> + unsigned int head, tail, mask, count;
> unsigned nbuf;
> unsigned idx;
> struct pipe_buffer *bufs;
> @@ -1949,8 +1950,12 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
>
> pipe_lock(pipe);
>
> - bufs = kvmalloc_array(pipe->nrbufs, sizeof(struct pipe_buffer),
> - GFP_KERNEL);
> + head = pipe->head;
> + tail = pipe->tail;
> + mask = pipe->ring_size - 1;
> + count = head - tail;
> +
> + bufs = kvmalloc_array(count, sizeof(struct pipe_buffer), GFP_KERNEL);
> if (!bufs) {
> pipe_unlock(pipe);
> return -ENOMEM;
> @@ -1958,8 +1963,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
>
> nbuf = 0;
> rem = 0;
> - for (idx = 0; idx < pipe->nrbufs && rem < len; idx++)
> - rem += pipe->bufs[(pipe->curbuf + idx) & (pipe->buffers - 1)].len;
> + for (idx = tail; idx < head && rem < len; idx++)
> + rem += pipe->bufs[idx & mask].len;
>
> ret = -EINVAL;
> if (rem < len)
> @@ -1970,16 +1975,16 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
> struct pipe_buffer *ibuf;
> struct pipe_buffer *obuf;
>
> - BUG_ON(nbuf >= pipe->buffers);
> - BUG_ON(!pipe->nrbufs);
> - ibuf = &pipe->bufs[pipe->curbuf];
> + BUG_ON(nbuf >= pipe->ring_size);
> + BUG_ON(tail == head);
> + ibuf = &pipe->bufs[tail & mask];
> obuf = &bufs[nbuf];
>
> if (rem >= ibuf->len) {
> *obuf = *ibuf;
> ibuf->ops = NULL;
> - pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
> - pipe->nrbufs--;
> + tail++;
> + pipe_commit_read(pipe, tail);
> } else {
> if (!pipe_buf_get(pipe, ibuf))
> goto out_free;
> diff --git a/fs/pipe.c b/fs/pipe.c
> index 8a2ab2f974bd..8a0806fe12d3 100644
> --- a/fs/pipe.c
> +++ b/fs/pipe.c
> @@ -43,10 +43,11 @@ unsigned long pipe_user_pages_hard;
> unsigned long pipe_user_pages_soft = PIPE_DEF_BUFFERS * INR_OPEN_CUR;
>
> /*
> - * We use a start+len construction, which provides full use of the
> - * allocated memory.
> - * -- Florian Coosmann (FGC)
> - *
> + * We use head and tail indices that aren't masked off, except at the point of
> + * dereference, but rather they're allowed to wrap naturally. This means there
> + * isn't a dead spot in the buffer, provided the ring size < INT_MAX.
> + * -- David Howells 2019-09-23.

Hi David,

Is "ring size < INT_MAX" constraint correct?

I've never had to implement this free running indices scheme, but
the way I've always visualized it is that the top bit of the index is
used as a lap (as in a race) indicator, leaving 31 bits to work with
(in case of unsigned ints). Should that be

ring size <= 2^31

or more precisely

ring size is a power of two <= 2^31

or am I missing something?

Thanks,

Ilya

2019-10-30 20:36:59

by Rasmus Villemoes

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On 30/10/2019 17.19, Ilya Dryomov wrote:
> On Thu, Oct 24, 2019 at 11:49 AM David Howells <[email protected]> wrote:
>> /*
>> - * We use a start+len construction, which provides full use of the
>> - * allocated memory.
>> - * -- Florian Coosmann (FGC)
>> - *
>> + * We use head and tail indices that aren't masked off, except at the point of
>> + * dereference, but rather they're allowed to wrap naturally. This means there
>> + * isn't a dead spot in the buffer, provided the ring size < INT_MAX.
>> + * -- David Howells 2019-09-23.
>
> Hi David,
>
> Is "ring size < INT_MAX" constraint correct?

No. As long as one always uses a[idx % size] to access the array, the
only requirement is that size is representable in an unsigned int. Then
because one also wants to do the % using simple bitmasking, that further
restricts one to sizes that are a power of 2, so the end result is that
the max size is 2^31 (aka INT_MAX+1).

> I've never had to implement this free running indices scheme, but
> the way I've always visualized it is that the top bit of the index is
> used as a lap (as in a race) indicator, leaving 31 bits to work with
> (in case of unsigned ints). Should that be
>
> ring size <= 2^31
>
> or more precisely
>
> ring size is a power of two <= 2^31

Exactly. But it's kind of moot since the ring size would never be
allowed to grow anywhere near that.

Rasmus

2019-10-30 22:19:43

by Ilya Dryomov

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On Wed, Oct 30, 2019 at 9:35 PM Rasmus Villemoes
<[email protected]> wrote:
>
> On 30/10/2019 17.19, Ilya Dryomov wrote:
> > On Thu, Oct 24, 2019 at 11:49 AM David Howells <[email protected]> wrote:
> >> /*
> >> - * We use a start+len construction, which provides full use of the
> >> - * allocated memory.
> >> - * -- Florian Coosmann (FGC)
> >> - *
> >> + * We use head and tail indices that aren't masked off, except at the point of
> >> + * dereference, but rather they're allowed to wrap naturally. This means there
> >> + * isn't a dead spot in the buffer, provided the ring size < INT_MAX.
> >> + * -- David Howells 2019-09-23.
> >
> > Hi David,
> >
> > Is "ring size < INT_MAX" constraint correct?
>
> No. As long as one always uses a[idx % size] to access the array, the
> only requirement is that size is representable in an unsigned int. Then
> because one also wants to do the % using simple bitmasking, that further
> restricts one to sizes that are a power of 2, so the end result is that
> the max size is 2^31 (aka INT_MAX+1).

I think the fact that indices are free running and wrap at a power of
two already restricts you to sizes the are a power of two, independent
of how you do masking. If you switch to a[idx % size], size still has
to be a power of two for things to work when idx wraps. Consider:

size = 6
head = tail = 4294967292, empty buffer

push 4294967292 % 6 = 0
push 4294967293 % 6 = 1
push 4294967294 % 6 = 2
push 4294967295 % 6 = 3
push 0 % 6 = 0 <-- expected 4, overwrote a[0]

>
> > I've never had to implement this free running indices scheme, but
> > the way I've always visualized it is that the top bit of the index is
> > used as a lap (as in a race) indicator, leaving 31 bits to work with
> > (in case of unsigned ints). Should that be
> >
> > ring size <= 2^31
> >
> > or more precisely
> >
> > ring size is a power of two <= 2^31
>
> Exactly. But it's kind of moot since the ring size would never be
> allowed to grow anywhere near that.

Thanks for confirming. Even if it's kind of moot, I think it should be
corrected to avoid confusion.

Ilya

2019-10-30 22:43:26

by Rasmus Villemoes

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On 30/10/2019 23.16, Ilya Dryomov wrote:
> On Wed, Oct 30, 2019 at 9:35 PM Rasmus Villemoes
> <[email protected]> wrote:
>>
>> On 30/10/2019 17.19, Ilya Dryomov wrote:
>>> On Thu, Oct 24, 2019 at 11:49 AM David Howells <[email protected]> wrote:
>>>> /*
>>>> - * We use a start+len construction, which provides full use of the
>>>> - * allocated memory.
>>>> - * -- Florian Coosmann (FGC)
>>>> - *
>>>> + * We use head and tail indices that aren't masked off, except at the point of
>>>> + * dereference, but rather they're allowed to wrap naturally. This means there
>>>> + * isn't a dead spot in the buffer, provided the ring size < INT_MAX.
>>>> + * -- David Howells 2019-09-23.
>>>
>>> Hi David,
>>>
>>> Is "ring size < INT_MAX" constraint correct?
>>
>> No. As long as one always uses a[idx % size] to access the array, the
>> only requirement is that size is representable in an unsigned int. Then
>> because one also wants to do the % using simple bitmasking, that further
>> restricts one to sizes that are a power of 2, so the end result is that
>> the max size is 2^31 (aka INT_MAX+1).
>
> I think the fact that indices are free running and wrap at a power of
> two already restricts you to sizes the are a power of two,

Ah, yes, of course. When reducing indices mod n that may already have
been implicitly reduced mod N, N must be a multiple of n for the result
to be well-defined.

Rasmus

2019-10-31 14:59:28

by David Howells

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

Linus Torvalds <[email protected]> wrote:

> It's shorter and more obvious to just write
>
> pipe->head = head;
>
> than it is to write
>
> pipe_commit_write(pipe, head);

But easier to find the latter. But whatever.

David

2019-10-31 15:12:46

by David Howells

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

How about:

* We use head and tail indices that aren't masked off, except at the
* point of dereference, but rather they're allowed to wrap naturally.
* This means there isn't a dead spot in the buffer, provided the ring
* size is a power of two and <= 2^31.

David

2019-10-31 15:15:46

by David Howells

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

Christoph Hellwig <[email protected]> wrote:

> I am _really_ worried about overloading fsync for this behavior. fsync
> hasn't done anything for 50 years, and suddenly adding any action
> is not helpful. If you can't use FIONREAD please add a new ioctls
> instead, and document it properly.

Okay.

David

2019-10-31 15:17:51

by David Howells

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

Konstantin Khlebnikov <[email protected]> wrote:

> Similar synchronization is required for reusing memory after vmsplice()?
> I don't see other way how sender could safely change these pages.

Sounds like a point - if you have multiple parallel contributors to the pipe
via vmsplice(), then FIONREAD is of no use. To use use FIONREAD, you have to
let the pipe become empty before you can be sure.

David

2019-10-31 15:23:10

by David Howells

[permalink] [raw]
Subject: Re: [RFC PATCH 07/10] pipe: Conditionalise wakeup in pipe_read() [ver #2]

Konstantin Khlebnikov <[email protected]> wrote:

> > Only do a wakeup in pipe_read() if we made space in a completely full
> > buffer. The producer shouldn't be waiting on pipe->wait otherwise.
>
> We could go further and wakeup writer only when at least half of buffer is
> empty. This gives better batching and reduces rate of context switches.
>
> https://lore.kernel.org/lkml/157219118016.7078.16223055699799396042.stgit@buzz/T/#u

Yeah, I saw that. I suspect that where you put the slider may depend on the
context.

David

2019-10-31 21:10:25

by Ilya Dryomov

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On Thu, Oct 31, 2019 at 4:11 PM David Howells <[email protected]> wrote:
>
> How about:
>
> * We use head and tail indices that aren't masked off, except at the
> * point of dereference, but rather they're allowed to wrap naturally.
> * This means there isn't a dead spot in the buffer, provided the ring
> * size is a power of two and <= 2^31.

To me "provided" reads like this thing works without a dead spot or
with a dead spot, depending on whether the condition is met. I would
say:

> * This means there isn't a dead spot in the buffer, but the ring
> * size has to be a power of two and <= 2^31.

Thanks,

Ilya

2019-10-31 21:14:06

by David Howells

[permalink] [raw]
Subject: Re: [RFC PATCH 07/10] pipe: Conditionalise wakeup in pipe_read() [ver #2]

Okay, attached is a change that might give you what you want. I tried my
pipe-bench program (see cover note) with perf. The output of the program with
the patch applied was:

- pipe 305127298 36262221772 302185181 7887690

The output of perf with the patch applied:

239,943.92 msec task-clock # 1.997 CPUs utilized
17,728 context-switches # 73.884 M/sec
124 cpu-migrations # 0.517 M/sec
9,330 page-faults # 38.884 M/sec
885,107,207,365 cycles # 3688822.793 GHz
1,386,873,499,490 instructions # 1.57 insn per cycle
311,037,372,339 branches # 1296296921.931 M/sec
33,467,827 branch-misses # 0.01% of all branches

And without:

239,891.87 msec task-clock # 1.997 CPUs utilized
22,187 context-switches # 92.488 M/sec
133 cpu-migrations # 0.554 M/sec
9,334 page-faults # 38.909 M/sec
884,906,976,128 cycles # 3688787.725 GHz
1,391,986,932,265 instructions # 1.57 insn per cycle
311,394,686,857 branches # 1298067400.849 M/sec
30,242,823 branch-misses # 0.01% of all branches

So it did make something like a 20% reduction in context switches.

David
---
diff --git a/fs/pipe.c b/fs/pipe.c
index e3d5f7a39123..5167921edd73 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -276,7 +276,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
size_t total_len = iov_iter_count(to);
struct file *filp = iocb->ki_filp;
struct pipe_inode_info *pipe = filp->private_data;
- int do_wakeup;
+ int do_wakeup, wake;
ssize_t ret;

/* Null read succeeds. */
@@ -329,11 +329,12 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
tail++;
pipe->tail = tail;
do_wakeup = 1;
- if (head - (tail - 1) == pipe->max_usage)
+ wake = head - (tail - 1) == pipe->max_usage / 2;
+ if (wake)
wake_up_interruptible_sync_poll_locked(
&pipe->wait, EPOLLOUT | EPOLLWRNORM);
spin_unlock_irq(&pipe->wait.lock);
- if (head - (tail - 1) == pipe->max_usage)
+ if (wake)
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
}
total_len -= chars;

2019-11-01 14:56:40

by David Howells

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

Ilya Dryomov <[email protected]> wrote:

> > * This means there isn't a dead spot in the buffer, but the ring
> > * size has to be a power of two and <= 2^31.

I'll go with that, thanks.

David

2019-11-02 18:57:09

by Linus Torvalds

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

On Thu, Oct 31, 2019 at 8:16 AM David Howells <[email protected]> wrote:
>
> Konstantin Khlebnikov <[email protected]> wrote:
>
> > Similar synchronization is required for reusing memory after vmsplice()?
> > I don't see other way how sender could safely change these pages.
>
> Sounds like a point - if you have multiple parallel contributors to the pipe
> via vmsplice(), then FIONREAD is of no use. To use use FIONREAD, you have to
> let the pipe become empty before you can be sure.

Well, the rules for vmsplice is simply to not change the source pages.
It's zero-copy, after all.

If you want to change the source pages, you need to just use write() instead.

That said, even then the right model isn't fsync(). If you really want
to have something like "notify me when this buffer has been used", it
should be some kind of sequence count thing, not a "wait for empty".

Which might be useful in theory, but would be something quite
different (and honestly, I wouldn't expect it to find all that
widespread use)

Linus

2019-11-02 19:36:18

by David Howells

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

Linus Torvalds <[email protected]> wrote:

> > > Similar synchronization is required for reusing memory after vmsplice()?
> > > I don't see other way how sender could safely change these pages.

Actually, it's probably worse than that. If the output of the pipe gets teed
or spliced somewhere else, you still don't know when the vmspliced pages are
finished with.

David

2019-11-02 20:33:14

by Andy Lutomirski

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]



> On Nov 2, 2019, at 12:34 PM, David Howells <[email protected]> wrote:
>
> Linus Torvalds <[email protected]> wrote:
>
>>>> Similar synchronization is required for reusing memory after vmsplice()?
>>>> I don't see other way how sender could safely change these pages.
>
> Actually, it's probably worse than that. If the output of the pipe gets teed
> or spliced somewhere else, you still don't know when the vmspliced pages are
> finished with.
>
>

I sometimes wonder whether vmsplice should be disallowed or severely restricted. Even ignoring these usability issues, it makes me very uncomfortable that you can have some data queue up on a pipe, tee() it, and get *different* data in the original pipe and the teed copy because the sender used vmsplice and is messing with you.

Add in the fact that it’s not obvious that vmsplice *can* be used correctly, and I’m wondering if we should just remove it or make it just do write() under the hood.

I suppose the kernel could guarantee that it stops referring to the vmsplice source pages as soon as anything sees *or* tees the data. This way it would be, at least in principle, possible to say “hey, the pipe has consumed the first n vmspliced bytes, so I can reuse that memory”.

2019-11-02 22:06:23

by Linus Torvalds

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

On Sat, Nov 2, 2019 at 1:31 PM Andy Lutomirski <[email protected]> wrote:
>
> Add in the fact that it’s not obvious that vmsplice *can* be used correctly, and I’m wondering if we should just remove it or make it just do write() under the hood.

Sure it can. Just don't modify the data you vmsplice. It's really that simple.

That said, if we don't have any actual users, then we should look at
removing it (maybe turning it into "write()" as you say). Not because
it's hard to use, but simply because it probably doesn't have that
many uses.

Linus

2019-11-02 22:11:22

by Linus Torvalds

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

On Sat, Nov 2, 2019 at 3:03 PM Linus Torvalds
<[email protected]> wrote:
>
> On Sat, Nov 2, 2019 at 1:31 PM Andy Lutomirski <[email protected]> wrote:
> >
> > Add in the fact that it’s not obvious that vmsplice *can* be used correctly, and I’m wondering if we should just remove it or make it just do write() under the hood.
>
> Sure it can. Just don't modify the data you vmsplice. It's really that simple.
>
> That said, if we don't have any actual users, then we should look at
> removing it (maybe turning it into "write()" as you say). Not because
> it's hard to use, but simply because it probably doesn't have that
> many uses.

Looking at debian code search, there are _some_ uses (including
openssl and fuse):

https://codesearch.debian.net/search?q=%3D+vmsplice%28&literal=1

but I didn't check any more closely what they do.

Linus

2019-11-02 22:34:40

by Andy Lutomirski

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]



> On Nov 2, 2019, at 3:04 PM, Linus Torvalds <[email protected]> wrote:
>
> On Sat, Nov 2, 2019 at 1:31 PM Andy Lutomirski <[email protected]> wrote:
>>
>> Add in the fact that it’s not obvious that vmsplice *can* be used correctly, and I’m wondering if we should just remove it or make it just do write() under the hood.
>
> Sure it can. Just don't modify the data you vmsplice. It's really that simple.

So you allocate memory, vmsplice, and munmap() without reusing it? Just plain free() won’t be good enough. I suspect the TLB overhead will make this a loss in most workloads?

Or maybe you vmsplice from a read-only mapping of a file that you know no one modifies? This could be useful, but you can just splice() from the file directly.

2019-11-02 23:05:07

by Linus Torvalds

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

On Sat, Nov 2, 2019 at 3:30 PM Andy Lutomirski <[email protected]> wrote:
>
> So you allocate memory, vmsplice, and munmap() without reusing it?

You can re-use it as much as you want. Just don't write to it.

So the traditional argument for this was "I do a caching http server".
If you don't ever load the data into user space at all and just push
file data out, you just use splice() from the file to the target. But
if you generate some of the data in memory, and you cache it, you use
vmsplice().

And then it really is very easy to set up: make sure you generate your
caches with a new clean private mmap, and you can throw them out with
munmap (or just over-mmap it with the new cache, of course).

If you don't cache it, then there's no advantage to vmsplice() - just
write() it and forget about it. The whole (and only) point of
vmsplice() is when you want to zero-copy the data, and that's
generally likely only an advantage if you can do it multiple times.

But I don't think anybody actually _did_ any of that. But that's
basically the argument for the three splice operations:
write/vmsplice/splice(). Which one you use depends on the lifetime and
the source of your data. write() is obviously for the copy case (the
source data might not be stable), while splice() is for the "data from
another source", and vmsplace() is "data is from stable data in my
vm".

There's the reverse op, of course, but we never implemented that:
mmap() on the pipe could do the reverse of a vmsplice() (moving from
the pipe to the vm), but it would only work if everything was
page-aligned, which it effectively never is. It's basically a
benchmark-only operation.

And the existence of vmsplice() is because we actually had code to
play games with making write() do a zero-copy but mark the source as
being COW. It was _wonderful_ for benchmarks, and was completely
useless for real world case because in the real world you always took
the COW fault. So vmsplice() is basically a "hey, I know what I'm
doing, and you can just take the page as-is because the source is
stable".

Linus

2019-11-02 23:13:11

by Linus Torvalds

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

On Sat, Nov 2, 2019 at 4:02 PM Linus Torvalds
<[email protected]> wrote:
>
> But I don't think anybody actually _did_ any of that. But that's
> basically the argument for the three splice operations:
> write/vmsplice/splice(). Which one you use depends on the lifetime and
> the source of your data. write() is obviously for the copy case (the
> source data might not be stable), while splice() is for the "data from
> another source", and vmsplace() is "data is from stable data in my
> vm".

Btw, it's really worth noting that "splice()" and friends are from a
more happy-go-lucky time when we were experimenting with new
interfaces, and in a day and age when people thought that interfaces
like "sendpage()" and zero-copy and playing games with the VM was a
great thing to do.

It turns out that VM games are almost always more expensive than just
copying the data in the first place, but hey, people didn't know that,
and zero-copy was seen a big deal.

The reality is that almost nobody uses splice and vmsplice at all, and
they have been a much bigger headache than they are worth. If I could
go back in time and not do them, I would. But there have been a few
very special uses that seem to actually like the interfaces.

But it's entirely possible that we should kill vmsplice() (likely by
just implementing the semantics as "write()") because it's not common
enough to have the complexity.

Linus

2019-11-02 23:15:51

by Andy Lutomirski

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

On Sat, Nov 2, 2019 at 4:10 PM Linus Torvalds
<[email protected]> wrote:
>
> On Sat, Nov 2, 2019 at 4:02 PM Linus Torvalds
> <[email protected]> wrote:
> >
> > But I don't think anybody actually _did_ any of that. But that's
> > basically the argument for the three splice operations:
> > write/vmsplice/splice(). Which one you use depends on the lifetime and
> > the source of your data. write() is obviously for the copy case (the
> > source data might not be stable), while splice() is for the "data from
> > another source", and vmsplace() is "data is from stable data in my
> > vm".
>
> Btw, it's really worth noting that "splice()" and friends are from a
> more happy-go-lucky time when we were experimenting with new
> interfaces, and in a day and age when people thought that interfaces
> like "sendpage()" and zero-copy and playing games with the VM was a
> great thing to do.

I suppose a nicer interface might be:


madvise(buf, len, MADV_STABILIZE);

(MADV_STABILIZE is an imaginary operation that write protects the
memory a la fork() but without the copying part.)

vmsplice_safer(fd, ...);

Where vmsplice_safer() is like vmsplice, except that it only works on
write-protected pages. If you vmsplice_safer() some memory and then
write to the memory, the pipe keeps the old copy.

But this can all be done with memfd and splice, too, I think.


>
> It turns out that VM games are almost always more expensive than just
> copying the data in the first place, but hey, people didn't know that,
> and zero-copy was seen a big deal.
>
> The reality is that almost nobody uses splice and vmsplice at all, and
> they have been a much bigger headache than they are worth. If I could
> go back in time and not do them, I would. But there have been a few
> very special uses that seem to actually like the interfaces.
>
> But it's entirely possible that we should kill vmsplice() (likely by
> just implementing the semantics as "write()") because it's not common
> enough to have the complexity.

I think this is the right choice.

FWIW, the openssl vmsplice() call looks dubious, but I suspect it's
okay because it's vmsplicing to a netlink socket, and the kernel code
on the other end won't read the data after it returns a response.

--Andy

2019-11-03 11:06:33

by Konstantin Khlebnikov

[permalink] [raw]
Subject: Re: [RFC PATCH 07/10] pipe: Conditionalise wakeup in pipe_read() [ver #2]

On 31/10/2019 19.38, David Howells wrote:
> Okay, attached is a change that might give you what you want. I tried my
> pipe-bench program (see cover note) with perf. The output of the program with
> the patch applied was:
>
> - pipe 305127298 36262221772 302185181 7887690
>
> The output of perf with the patch applied:
>
> 239,943.92 msec task-clock # 1.997 CPUs utilized
> 17,728 context-switches # 73.884 M/sec
> 124 cpu-migrations # 0.517 M/sec
> 9,330 page-faults # 38.884 M/sec
> 885,107,207,365 cycles # 3688822.793 GHz
> 1,386,873,499,490 instructions # 1.57 insn per cycle
> 311,037,372,339 branches # 1296296921.931 M/sec
> 33,467,827 branch-misses # 0.01% of all branches
>
> And without:
>
> 239,891.87 msec task-clock # 1.997 CPUs utilized
> 22,187 context-switches # 92.488 M/sec
> 133 cpu-migrations # 0.554 M/sec
> 9,334 page-faults # 38.909 M/sec
> 884,906,976,128 cycles # 3688787.725 GHz
> 1,391,986,932,265 instructions # 1.57 insn per cycle
> 311,394,686,857 branches # 1298067400.849 M/sec
> 30,242,823 branch-misses # 0.01% of all branches
>
> So it did make something like a 20% reduction in context switches.

Ok. Looks promising. Depending on workload reduction might be much bigger.

I suppose buffer resize (grow) makes wakeup unconditionally. Should be ok.

>
> David
> ---
> diff --git a/fs/pipe.c b/fs/pipe.c
> index e3d5f7a39123..5167921edd73 100644
> --- a/fs/pipe.c
> +++ b/fs/pipe.c
> @@ -276,7 +276,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
> size_t total_len = iov_iter_count(to);
> struct file *filp = iocb->ki_filp;
> struct pipe_inode_info *pipe = filp->private_data;
> - int do_wakeup;
> + int do_wakeup, wake;
> ssize_t ret;
>
> /* Null read succeeds. */
> @@ -329,11 +329,12 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
> tail++;
> pipe->tail = tail;
> do_wakeup = 1;
> - if (head - (tail - 1) == pipe->max_usage)
> + wake = head - (tail - 1) == pipe->max_usage / 2;
> + if (wake)
> wake_up_interruptible_sync_poll_locked(
> &pipe->wait, EPOLLOUT | EPOLLWRNORM);
> spin_unlock_irq(&pipe->wait.lock);
> - if (head - (tail - 1) == pipe->max_usage)
> + if (wake)
> kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
> }
> total_len -= chars;
>

2019-11-03 11:18:45

by Matthew Wilcox

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On Thu, Oct 31, 2019 at 02:57:51PM +0000, David Howells wrote:
> Linus Torvalds <[email protected]> wrote:
>
> > It's shorter and more obvious to just write
> >
> > pipe->head = head;
> >
> > than it is to write
> >
> > pipe_commit_write(pipe, head);
>
> But easier to find the latter. But whatever.

May I suggest that you use a name that's easier to grep for?

$ git grep -cw p_tail
drivers/net/wireless/broadcom/brcm80211/brcmfmac/fwsignal.c:9
$ git grep -cw p_head
drivers/net/wireless/broadcom/brcm80211/brcmfmac/fwsignal.c:3

2019-11-03 12:05:43

by Konstantin Khlebnikov

[permalink] [raw]
Subject: Re: [RFC PATCH 11/10] pipe: Add fsync() support [ver #2]

On 03/11/2019 02.14, Andy Lutomirski wrote:
> On Sat, Nov 2, 2019 at 4:10 PM Linus Torvalds
> <[email protected]> wrote:
>>
>> On Sat, Nov 2, 2019 at 4:02 PM Linus Torvalds
>> <[email protected]> wrote:
>>>
>>> But I don't think anybody actually _did_ any of that. But that's
>>> basically the argument for the three splice operations:
>>> write/vmsplice/splice(). Which one you use depends on the lifetime and
>>> the source of your data. write() is obviously for the copy case (the
>>> source data might not be stable), while splice() is for the "data from
>>> another source", and vmsplace() is "data is from stable data in my
>>> vm".
>>
>> Btw, it's really worth noting that "splice()" and friends are from a
>> more happy-go-lucky time when we were experimenting with new
>> interfaces, and in a day and age when people thought that interfaces
>> like "sendpage()" and zero-copy and playing games with the VM was a
>> great thing to do.
>
> I suppose a nicer interface might be:
>
>
> madvise(buf, len, MADV_STABILIZE);
>
> (MADV_STABILIZE is an imaginary operation that write protects the
> memory a la fork() but without the copying part.)
>
> vmsplice_safer(fd, ...);
>
> Where vmsplice_safer() is like vmsplice, except that it only works on
> write-protected pages. If you vmsplice_safer() some memory and then
> write to the memory, the pipe keeps the old copy.
>
> But this can all be done with memfd and splice, too, I think.

Looks monstrous. This will kill all fun and profit. =)

I think vmsplice should at least deprecate and ignore SPLICE_F_GIFT.

It almost never works - if page still mapped then page_count in
generic_pipe_buf_steal() will be at least 2 (pte and pipe gup).
But if user munmap vma between splicing and consuming (and page not
stuck in lazy tlb and per-cpu vectors) then page from anon lru
could be spliced into file. Ouch.

And looks like fuse device still accepts SPLICE_F_MOVE.

>
>
>>
>> It turns out that VM games are almost always more expensive than just
>> copying the data in the first place, but hey, people didn't know that,
>> and zero-copy was seen a big deal.
>>
>> The reality is that almost nobody uses splice and vmsplice at all, and
>> they have been a much bigger headache than they are worth. If I could
>> go back in time and not do them, I would. But there have been a few
>> very special uses that seem to actually like the interfaces.
>>
>> But it's entirely possible that we should kill vmsplice() (likely by
>> just implementing the semantics as "write()") because it's not common
>> enough to have the complexity.
>
> I think this is the right choice.
>
> FWIW, the openssl vmsplice() call looks dubious, but I suspect it's
> okay because it's vmsplicing to a netlink socket, and the kernel code
> on the other end won't read the data after it returns a response.
>
> --Andy
>

2019-12-06 21:54:47

by Johannes Hirte

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On 2019 Okt 23, David Howells wrote:
> Convert pipes to use head and tail pointers for the buffer ring rather than
> pointer and length as the latter requires two atomic ops to update (or a
> combined op) whereas the former only requires one.

This change breaks firefox on my system. I've noticed that some pages
doesn't load correctly anymore (e.g. facebook, spiegel.de). The pages
start loading and than stop. Looks like firefox is waiting for some
dynamic loading content. I've bisected to this commit, but can't revert
because of conflicts.

--
Regards,
Johannes Hirte

2019-12-06 22:16:01

by Linus Torvalds

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On Fri, Dec 6, 2019 at 1:47 PM Johannes Hirte
<[email protected]> wrote:
>
> This change breaks firefox on my system. I've noticed that some pages
> doesn't load correctly anymore (e.g. facebook, spiegel.de). The pages
> start loading and than stop. Looks like firefox is waiting for some
> dynamic loading content. I've bisected to this commit, but can't revert
> because of conflicts.

Can you check the current git tree, and see if we've fixed it for you.
There are several fixes there, one being the (currently) topmost
commit 76f6777c9cc0 ("pipe: Fix iteration end check in
fuse_dev_splice_write()").

I _just_ pushed out that one, so check that you get it - it sometimes
takes a couple of minutes for the public-facing git servers to mirror
out. I doubt that's the one that would fix firefox, but still..

Linus

2019-12-06 22:16:17

by David Howells

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

Johannes Hirte <[email protected]> wrote:

> > Convert pipes to use head and tail pointers for the buffer ring rather than
> > pointer and length as the latter requires two atomic ops to update (or a
> > combined op) whereas the former only requires one.
>
> This change breaks firefox on my system. I've noticed that some pages
> doesn't load correctly anymore (e.g. facebook, spiegel.de). The pages
> start loading and than stop. Looks like firefox is waiting for some
> dynamic loading content. I've bisected to this commit, but can't revert
> because of conflicts.

There are a number of patches committed to upstream in the last couple of days
that might fix your problem. See:

https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/log/

and look for:

pipe: Fix iteration end check in fuse_dev_splice_write()
pipe: fix incorrect caching of pipe state over pipe_wait()
pipe: Fix missing mask update after pipe_wait()
pipe: Remove assertion from pipe_poll()

David

2019-12-07 00:01:41

by Johannes Hirte

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On 2019 Dez 06, Linus Torvalds wrote:
> On Fri, Dec 6, 2019 at 1:47 PM Johannes Hirte
> <[email protected]> wrote:
> >
> > This change breaks firefox on my system. I've noticed that some pages
> > doesn't load correctly anymore (e.g. facebook, spiegel.de). The pages
> > start loading and than stop. Looks like firefox is waiting for some
> > dynamic loading content. I've bisected to this commit, but can't revert
> > because of conflicts.
>
> Can you check the current git tree, and see if we've fixed it for you.
> There are several fixes there, one being the (currently) topmost
> commit 76f6777c9cc0 ("pipe: Fix iteration end check in
> fuse_dev_splice_write()").
>
> I _just_ pushed out that one, so check that you get it - it sometimes
> takes a couple of minutes for the public-facing git servers to mirror
> out. I doubt that's the one that would fix firefox, but still..
>
> Linus

Tested with 5.4.0-11505-g347f56fb3890 and still the same wrong behavior.
Reliable testcase is facebook, where timeline isn't updated with firefox.

--
Regards,
Johannes Hirte

2019-12-07 01:11:48

by Linus Torvalds

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On Fri, Dec 6, 2019 at 4:00 PM Johannes Hirte
<[email protected]> wrote:
>
> Tested with 5.4.0-11505-g347f56fb3890 and still the same wrong behavior.

Ok, we'll continue looking.

That said, your version string is strange.

Commit 347f56fb3890 should be "v5.4.0-13174-g347f56fb3890", the fact
that you have "11505" confuses me.

The hash is what matters, but I wonder what is going on that you have
the commit count in that version string so wrong.

Linus

2019-12-07 06:48:27

by Linus Torvalds

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On Fri, Dec 6, 2019 at 4:00 PM Johannes Hirte
<[email protected]> wrote:
>
> Tested with 5.4.0-11505-g347f56fb3890 and still the same wrong behavior.
> Reliable testcase is facebook, where timeline isn't updated with firefox.

Hmm. I'm not on FB, so that's not a great test for me.

But I've been staring at the code for a long time, and I did find another issue.

poll() and select() were subtly racy and broken. The code did

unsigned int head = READ_ONCE(pipe->head);
unsigned int tail = READ_ONCE(pipe->tail);

which is ok in theory - select and poll can be racy, and doing racy
reads is ok and we do it in other places too.

But when you don't do proper locking and do racy poll/select, you need
to make sure that *if* you were wrong, and said "there's nothing
pending", you need to have added yourself to the wait-queue so that
any changes caused poll to update.

And the new pipe code did that wrong. It added itself to the poll wait
queues *after* it had read that racy data, so you could get into a
race where

- poll reads stale data

- data changes, wakeup happens

- poll adds itself to the poll wait queue after the wakeup

- poll returns "nothing to read/write" based on stale data, and never
saw the wakeup event that told it otherwise.

So a patch something like the appended (whitespace-damaged once again,
because it's untested and I've only been _looking_ a the code) might
solve that issue.

That said, the race here is quite small. Since that firefox problem is
apparently repeatable for you, the timing is either _very_ unlucky, or
there is something else going on too.

Linus

--- snip snip ---

diff --git a/fs/pipe.c b/fs/pipe.c
index c561f7f5e902..4c39ea9b3419 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -557,12 +557,24 @@ pipe_poll(struct file *filp, poll_table *wait)
{
__poll_t mask;
struct pipe_inode_info *pipe = filp->private_data;
- unsigned int head = READ_ONCE(pipe->head);
- unsigned int tail = READ_ONCE(pipe->tail);
+ unsigned int head, tail;

+ /*
+ * Reading only -- no need for acquiring the semaphore.
+ *
+ * But because this is racy, the code has to add the
+ * entry to the poll table _first_ ..
+ */
poll_wait(filp, &pipe->wait, wait);

- /* Reading only -- no need for acquiring the semaphore. */
+ /*
+ * .. and only then can you do the racy tests. That way,
+ * if something changes and you got it wrong, the poll
+ * table entry will wake you up and fix it.
+ */
+ head = READ_ONCE(pipe->head);
+ tail = READ_ONCE(pipe->tail);
+
mask = 0;
if (filp->f_mode & FMODE_READ) {
if (!pipe_empty(head, tail))

2019-12-08 17:59:46

by Johannes Hirte

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On 2019 Dez 06, Linus Torvalds wrote:
> On Fri, Dec 6, 2019 at 4:00 PM Johannes Hirte
> <[email protected]> wrote:
> >
> > Tested with 5.4.0-11505-g347f56fb3890 and still the same wrong behavior.
>
> Ok, we'll continue looking.
>
> That said, your version string is strange.
>
> Commit 347f56fb3890 should be "v5.4.0-13174-g347f56fb3890", the fact
> that you have "11505" confuses me.
>
> The hash is what matters, but I wonder what is going on that you have
> the commit count in that version string so wrong.
>
> Linus

Yes, something got messed up here. After last pull, git describe says:

drm-next-2019-12-06-11662-g9455d25f4e3b

whereas with a fresh cloned repo I get:

v5.4-13331-g9455d25f4e3b

I assume the later is right. With this version the bug seems to be
fixed, regardless of the commit count. Tested with different websites
and firefox works as expected again.


--
Regards,
Johannes Hirte

2019-12-08 18:13:15

by Linus Torvalds

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

On Sun, Dec 8, 2019 at 9:56 AM Johannes Hirte
<[email protected]> wrote:
>
> whereas with a fresh cloned repo I get:
>
> v5.4-13331-g9455d25f4e3b
>
> I assume the later is right. With this version the bug seems to be
> fixed, regardless of the commit count. Tested with different websites
> and firefox works as expected again.

Ok, good. It was almost certainly the select/poll race fix - Mariusz
Ceier reported a problem with youtube hanging, and confirmed that the
poll/select fix seems to have cleared that one up. I suspect that your
hang on fb and spiegel.de were the same thing.

So I think the pipe code is stable again with current -git. Thanks for
reporting and testing,

Linus