This prevents an issue where an ACK is delayed, a retransmit is queued (either
at the RPC or TCP level) and the ACK arrives before the retransmission hits the
wire. If this happens to an NFS WRITE RPC then the write() system call
completes and the userspace process can continue, potentially modifying data
referenced by the retransmission before the retransmission occurs.
Signed-off-by: Ian Campbell <[email protected]>
Acked-by: Trond Myklebust <[email protected]>
Cc: "David S. Miller" <[email protected]>
Cc: Neil Brown <[email protected]>
Cc: "J. Bruce Fields" <[email protected]>
Cc: [email protected]
Cc: [email protected]
---
include/linux/sunrpc/xdr.h | 2 ++
include/linux/sunrpc/xprt.h | 5 ++++-
net/sunrpc/clnt.c | 27 ++++++++++++++++++++++-----
net/sunrpc/svcsock.c | 3 ++-
net/sunrpc/xprt.c | 13 +++++++++++++
net/sunrpc/xprtsock.c | 3 ++-
6 files changed, 45 insertions(+), 8 deletions(-)
diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
index a20970e..172f81e 100644
--- a/include/linux/sunrpc/xdr.h
+++ b/include/linux/sunrpc/xdr.h
@@ -16,6 +16,7 @@
#include <asm/byteorder.h>
#include <asm/unaligned.h>
#include <linux/scatterlist.h>
+#include <linux/skbuff.h>
/*
* Buffer adjustment
@@ -57,6 +58,7 @@ struct xdr_buf {
tail[1]; /* Appended after page data */
struct page ** pages; /* Array of contiguous pages */
+ struct skb_frag_destructor *destructor;
unsigned int page_base, /* Start of page data */
page_len, /* Length of page data */
flags; /* Flags for data disposition */
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 15518a1..75131eb 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -92,7 +92,10 @@ struct rpc_rqst {
/* A cookie used to track the
state of the transport
connection */
-
+ struct skb_frag_destructor destructor; /* SKB paged fragment
+ * destructor for
+ * transmitted pages*/
+
/*
* Partial send handling
*/
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index c5347d2..919538d 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -61,6 +61,7 @@ static void call_reserve(struct rpc_task *task);
static void call_reserveresult(struct rpc_task *task);
static void call_allocate(struct rpc_task *task);
static void call_decode(struct rpc_task *task);
+static void call_complete(struct rpc_task *task);
static void call_bind(struct rpc_task *task);
static void call_bind_status(struct rpc_task *task);
static void call_transmit(struct rpc_task *task);
@@ -1113,6 +1114,8 @@ rpc_xdr_encode(struct rpc_task *task)
(char *)req->rq_buffer + req->rq_callsize,
req->rq_rcvsize);
+ req->rq_snd_buf.destructor = &req->destructor;
+
p = rpc_encode_header(task);
if (p == NULL) {
printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
@@ -1276,6 +1279,7 @@ call_connect_status(struct rpc_task *task)
static void
call_transmit(struct rpc_task *task)
{
+ struct rpc_rqst *req = task->tk_rqstp;
dprint_status(task);
task->tk_action = call_status;
@@ -1309,8 +1313,8 @@ call_transmit(struct rpc_task *task)
call_transmit_status(task);
if (rpc_reply_expected(task))
return;
- task->tk_action = rpc_exit_task;
- rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
+ task->tk_action = call_complete;
+ skb_frag_destructor_unref(&req->destructor);
}
/*
@@ -1383,7 +1387,8 @@ call_bc_transmit(struct rpc_task *task)
return;
}
- task->tk_action = rpc_exit_task;
+ task->tk_action = call_complete;
+ skb_frag_destructor_unref(&req->destructor);
if (task->tk_status < 0) {
printk(KERN_NOTICE "RPC: Could not send backchannel reply "
"error: %d\n", task->tk_status);
@@ -1423,7 +1428,6 @@ call_bc_transmit(struct rpc_task *task)
"error: %d\n", task->tk_status);
break;
}
- rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
}
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
@@ -1589,12 +1593,14 @@ call_decode(struct rpc_task *task)
return;
}
- task->tk_action = rpc_exit_task;
+ task->tk_action = call_complete;
if (decode) {
task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
task->tk_msg.rpc_resp);
}
+ rpc_sleep_on(&req->rq_xprt->pending, task, NULL);
+ skb_frag_destructor_unref(&req->destructor);
dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
task->tk_status);
return;
@@ -1609,6 +1615,17 @@ out_retry:
}
}
+/*
+ * 8. Wait for pages to be released by the network stack.
+ */
+static void
+call_complete(struct rpc_task *task)
+{
+ dprintk("RPC: %5u call_complete result %d\n",
+ task->tk_pid, task->tk_status);
+ task->tk_action = rpc_exit_task;
+}
+
static __be32 *
rpc_encode_header(struct rpc_task *task)
{
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 852a258..3685cad 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -196,7 +196,8 @@ int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
while (pglen > 0) {
if (slen == size)
flags = 0;
- result = kernel_sendpage(sock, *ppage, NULL, base, size, flags);
+ result = kernel_sendpage(sock, *ppage, xdr->destructor,
+ base, size, flags);
if (result > 0)
len += result;
if (result != size)
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index f4385e4..925aa0c 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1103,6 +1103,16 @@ static inline void xprt_init_xid(struct rpc_xprt *xprt)
xprt->xid = net_random();
}
+static int xprt_complete_skb_pages(void *calldata)
+{
+ struct rpc_task *task = calldata;
+ struct rpc_rqst *req = task->tk_rqstp;
+
+ dprintk("RPC: %5u completing skb pages\n", task->tk_pid);
+ rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
+ return 0;
+}
+
static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
{
struct rpc_rqst *req = task->tk_rqstp;
@@ -1115,6 +1125,9 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
req->rq_xid = xprt_alloc_xid(xprt);
req->rq_release_snd_buf = NULL;
xprt_reset_majortimeo(req);
+ atomic_set(&req->destructor.ref, 1);
+ req->destructor.destroy = &xprt_complete_skb_pages;
+ req->destructor.data = task;
dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
req, ntohl(req->rq_xid));
}
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index f79e40e9..af3a106 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -408,7 +408,8 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
remainder -= len;
if (remainder != 0 || more)
flags |= MSG_MORE;
- err = sock->ops->sendpage(sock, *ppage, NULL, base, len, flags);
+ err = sock->ops->sendpage(sock, *ppage, xdr->destructor,
+ base, len, flags);
if (remainder == 0 || err != len)
break;
sent += err;
--
1.7.2.5
On Wed, Nov 09, 2011 at 03:02:07PM +0000, Ian Campbell wrote:
> This prevents an issue where an ACK is delayed, a retransmit is queued (either
> at the RPC or TCP level) and the ACK arrives before the retransmission hits the
> wire. If this happens to an NFS WRITE RPC then the write() system call
> completes and the userspace process can continue, potentially modifying data
> referenced by the retransmission before the retransmission occurs.
>
> Signed-off-by: Ian Campbell <[email protected]>
> Acked-by: Trond Myklebust <[email protected]>
> Cc: "David S. Miller" <[email protected]>
> Cc: Neil Brown <[email protected]>
> Cc: "J. Bruce Fields" <[email protected]>
> Cc: [email protected]
> Cc: [email protected]
So this blocks the system call until all page references
are gone, right?
But, there's no upper limit on how long the
page is referenced, correct? consider a bridged setup
with an skb queued at a tap device - this cause one process
to block another one by virtue of not consuming a cloned skb?
> ---
> include/linux/sunrpc/xdr.h | 2 ++
> include/linux/sunrpc/xprt.h | 5 ++++-
> net/sunrpc/clnt.c | 27 ++++++++++++++++++++++-----
> net/sunrpc/svcsock.c | 3 ++-
> net/sunrpc/xprt.c | 13 +++++++++++++
> net/sunrpc/xprtsock.c | 3 ++-
> 6 files changed, 45 insertions(+), 8 deletions(-)
>
> diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
> index a20970e..172f81e 100644
> --- a/include/linux/sunrpc/xdr.h
> +++ b/include/linux/sunrpc/xdr.h
> @@ -16,6 +16,7 @@
> #include <asm/byteorder.h>
> #include <asm/unaligned.h>
> #include <linux/scatterlist.h>
> +#include <linux/skbuff.h>
>
> /*
> * Buffer adjustment
> @@ -57,6 +58,7 @@ struct xdr_buf {
> tail[1]; /* Appended after page data */
>
> struct page ** pages; /* Array of contiguous pages */
> + struct skb_frag_destructor *destructor;
> unsigned int page_base, /* Start of page data */
> page_len, /* Length of page data */
> flags; /* Flags for data disposition */
> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
> index 15518a1..75131eb 100644
> --- a/include/linux/sunrpc/xprt.h
> +++ b/include/linux/sunrpc/xprt.h
> @@ -92,7 +92,10 @@ struct rpc_rqst {
> /* A cookie used to track the
> state of the transport
> connection */
> -
> + struct skb_frag_destructor destructor; /* SKB paged fragment
> + * destructor for
> + * transmitted pages*/
> +
> /*
> * Partial send handling
> */
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index c5347d2..919538d 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -61,6 +61,7 @@ static void call_reserve(struct rpc_task *task);
> static void call_reserveresult(struct rpc_task *task);
> static void call_allocate(struct rpc_task *task);
> static void call_decode(struct rpc_task *task);
> +static void call_complete(struct rpc_task *task);
> static void call_bind(struct rpc_task *task);
> static void call_bind_status(struct rpc_task *task);
> static void call_transmit(struct rpc_task *task);
> @@ -1113,6 +1114,8 @@ rpc_xdr_encode(struct rpc_task *task)
> (char *)req->rq_buffer + req->rq_callsize,
> req->rq_rcvsize);
>
> + req->rq_snd_buf.destructor = &req->destructor;
> +
> p = rpc_encode_header(task);
> if (p == NULL) {
> printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
> @@ -1276,6 +1279,7 @@ call_connect_status(struct rpc_task *task)
> static void
> call_transmit(struct rpc_task *task)
> {
> + struct rpc_rqst *req = task->tk_rqstp;
> dprint_status(task);
>
> task->tk_action = call_status;
> @@ -1309,8 +1313,8 @@ call_transmit(struct rpc_task *task)
> call_transmit_status(task);
> if (rpc_reply_expected(task))
> return;
> - task->tk_action = rpc_exit_task;
> - rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
> + task->tk_action = call_complete;
> + skb_frag_destructor_unref(&req->destructor);
> }
>
> /*
> @@ -1383,7 +1387,8 @@ call_bc_transmit(struct rpc_task *task)
> return;
> }
>
> - task->tk_action = rpc_exit_task;
> + task->tk_action = call_complete;
> + skb_frag_destructor_unref(&req->destructor);
> if (task->tk_status < 0) {
> printk(KERN_NOTICE "RPC: Could not send backchannel reply "
> "error: %d\n", task->tk_status);
> @@ -1423,7 +1428,6 @@ call_bc_transmit(struct rpc_task *task)
> "error: %d\n", task->tk_status);
> break;
> }
> - rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
> }
> #endif /* CONFIG_SUNRPC_BACKCHANNEL */
>
> @@ -1589,12 +1593,14 @@ call_decode(struct rpc_task *task)
> return;
> }
>
> - task->tk_action = rpc_exit_task;
> + task->tk_action = call_complete;
>
> if (decode) {
> task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
> task->tk_msg.rpc_resp);
> }
> + rpc_sleep_on(&req->rq_xprt->pending, task, NULL);
> + skb_frag_destructor_unref(&req->destructor);
> dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
> task->tk_status);
> return;
> @@ -1609,6 +1615,17 @@ out_retry:
> }
> }
>
> +/*
> + * 8. Wait for pages to be released by the network stack.
> + */
> +static void
> +call_complete(struct rpc_task *task)
> +{
> + dprintk("RPC: %5u call_complete result %d\n",
> + task->tk_pid, task->tk_status);
> + task->tk_action = rpc_exit_task;
> +}
> +
> static __be32 *
> rpc_encode_header(struct rpc_task *task)
> {
> diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
> index 852a258..3685cad 100644
> --- a/net/sunrpc/svcsock.c
> +++ b/net/sunrpc/svcsock.c
> @@ -196,7 +196,8 @@ int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
> while (pglen > 0) {
> if (slen == size)
> flags = 0;
> - result = kernel_sendpage(sock, *ppage, NULL, base, size, flags);
> + result = kernel_sendpage(sock, *ppage, xdr->destructor,
> + base, size, flags);
> if (result > 0)
> len += result;
> if (result != size)
> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> index f4385e4..925aa0c 100644
> --- a/net/sunrpc/xprt.c
> +++ b/net/sunrpc/xprt.c
> @@ -1103,6 +1103,16 @@ static inline void xprt_init_xid(struct rpc_xprt *xprt)
> xprt->xid = net_random();
> }
>
> +static int xprt_complete_skb_pages(void *calldata)
> +{
> + struct rpc_task *task = calldata;
> + struct rpc_rqst *req = task->tk_rqstp;
> +
> + dprintk("RPC: %5u completing skb pages\n", task->tk_pid);
> + rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
> + return 0;
> +}
> +
> static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
> {
> struct rpc_rqst *req = task->tk_rqstp;
> @@ -1115,6 +1125,9 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
> req->rq_xid = xprt_alloc_xid(xprt);
> req->rq_release_snd_buf = NULL;
> xprt_reset_majortimeo(req);
> + atomic_set(&req->destructor.ref, 1);
> + req->destructor.destroy = &xprt_complete_skb_pages;
> + req->destructor.data = task;
> dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
> req, ntohl(req->rq_xid));
> }
> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> index f79e40e9..af3a106 100644
> --- a/net/sunrpc/xprtsock.c
> +++ b/net/sunrpc/xprtsock.c
> @@ -408,7 +408,8 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
> remainder -= len;
> if (remainder != 0 || more)
> flags |= MSG_MORE;
> - err = sock->ops->sendpage(sock, *ppage, NULL, base, len, flags);
> + err = sock->ops->sendpage(sock, *ppage, xdr->destructor,
> + base, len, flags);
> if (remainder == 0 || err != len)
> break;
> sent += err;
> --
> 1.7.2.5
>
> --
> To unsubscribe from this list: send the line "unsubscribe netdev" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
On Sun, 2011-11-13 at 10:17 +0000, Michael S. Tsirkin wrote:
> On Fri, Nov 11, 2011 at 01:20:27PM +0000, Ian Campbell wrote:
> > On Fri, 2011-11-11 at 12:38 +0000, Michael S. Tsirkin wrote:
> > > On Wed, Nov 09, 2011 at 03:02:07PM +0000, Ian Campbell wrote:
> > > > This prevents an issue where an ACK is delayed, a retransmit is queued (either
> > > > at the RPC or TCP level) and the ACK arrives before the retransmission hits the
> > > > wire. If this happens to an NFS WRITE RPC then the write() system call
> > > > completes and the userspace process can continue, potentially modifying data
> > > > referenced by the retransmission before the retransmission occurs.
> > > >
> > > > Signed-off-by: Ian Campbell <[email protected]>
> > > > Acked-by: Trond Myklebust <[email protected]>
> > > > Cc: "David S. Miller" <[email protected]>
> > > > Cc: Neil Brown <[email protected]>
> > > > Cc: "J. Bruce Fields" <[email protected]>
> > > > Cc: [email protected]
> > > > Cc: [email protected]
> > >
> > > So this blocks the system call until all page references
> > > are gone, right?
> >
> > Right. The alternative is to return to userspace while the network stack
> > still has a reference to the buffer which was passed in -- that's the
> > exact class of problem this patch is supposed to fix.
>
> BTW, the increased latency and the overhead extra wakeups might for some
> workloads be greater than the cost of the data copy.
Under normal circumstances these paths should not be activated at all.
These only come into play if there are delays in the network coming from
somewhere and I would expect any negative effect from that to outweigh
either a copy or an additional wakeup.
> >
> > > consider a bridged setup
> > > with an skb queued at a tap device - this cause one process
> > > to block another one by virtue of not consuming a cloned skb?
> >
> > Hmm, yes.
> >
> > One approach might be to introduce the concept of an skb timeout to the
> > stack as a whole and cancel (or deep copy) after that timeout occurs.
> > That's going to be tricky though I suspect...
>
> Further, an application might use signals such as SIGALARM,
> delaying them significantly will cause trouble.
AIUI there is nothing to stop the SIGALARM being delivered in a timely
manner, all which may need to be delayed is the write() returning
-EINTR.
When -EINTR is returned the buffer must no longer be referenced either
implicitly or explicitly by the kernel and the write must not have
completed and nor should it complete in the future (that way lies
corruption of varying sorts) so copying the data pages is not helpful in
this case.
This patch ensures that the buffer is no longer referenced when the
write returns. It's possible that NFS might need to cancel a write
operation in order to not complete it after returning -EINTR (I don't
know if it does this or not) but I don't think this series impacts that
one way or the other.
>
> > A simpler option would be to have an end points such as a tap device
>
> Which end points would that be? Doesn't this affect a packet socket
> with matching filters? A userspace TCP socket that happens to
> reside on the same box? It also seems that at least with a tap device
> an skb can get queued in a qdisk, also indefinitely, right?
Those are all possibilities.
In order for this patch to have any impact on any of these scenarios
those end points would have to currently be referencing and using pages
of data which have been "returned" to the originating user process and
may be changing under their feet. This is without a doubt a Bad Thing.
In the normal case it is likely that the same end point which is
injecting delay is also the one the originating process is actually
trying to talk to and so the delay would already have been present and
this patch doesn't delay things any further.
If there are parts of the stack which can end up holding onto an skb for
an arbitrary amount of time then I think that is something which needs
to be fixed up in those end points rather than in everyone who injects
an skb into the stack. Whether an immediate deep copy or a more lazy
approach is appropriate I suspect depends upon the exact use case of
each end point.
Having said that one idea which springs to mind would be to allow
someone who has injected a page into the stack to "cancel" it. Since I
am working on pushing the struct page * down into the struct
skb_destructor this could be as simple as setting the page to NULL.
However every end point would need to be taught to expect this. I'm sure
there are also all sorts of locking nightmares underlying this idea.
Perhaps you'd need separate reference counts for queued vs in active
use.
Ian.
>
> > which can swallow skbs for arbitrary times implement a policy in this
> > regard, either to deep copy or drop after a timeout?
> >
> > Ian.
>
> Or deep copy immediately?
>
> This prevents an issue where an ACK is delayed, a retransmit is queued
(either
> at the RPC or TCP level) and the ACK arrives before the retransmission
hits the
> wire. If this happens to an NFS WRITE RPC then the write() system call
> completes and the userspace process can continue, potentially
modifying data
> referenced by the retransmission before the retransmission occurs.
The only problem I see is that the source address might get
invalidated (assuming it is a user address mapped into kernel).
Not sure what effect the fault would have...
If it is an RPC retransmittion the NFS write won't be
terminated by the RPC ACK (because it is a stale ACK)
and the destination will see two copies of the NFS write.
(This is quite common with NFS/UDP.)
If it is a TCP retransmittion, then the receiving TCP stack
will see it as duplicate data and discard the contents
without passing them to RPC.
Interrupting an NFS write with a signal is problematic
anyway - I'm sure I've seen systems (SYSV?) when NFS
writes were uninterruptable. NFI how Linux handles this.
David
On Fri, 2011-11-11 at 12:38 +0000, Michael S. Tsirkin wrote:
> On Wed, Nov 09, 2011 at 03:02:07PM +0000, Ian Campbell wrote:
> > This prevents an issue where an ACK is delayed, a retransmit is queued (either
> > at the RPC or TCP level) and the ACK arrives before the retransmission hits the
> > wire. If this happens to an NFS WRITE RPC then the write() system call
> > completes and the userspace process can continue, potentially modifying data
> > referenced by the retransmission before the retransmission occurs.
> >
> > Signed-off-by: Ian Campbell <[email protected]>
> > Acked-by: Trond Myklebust <[email protected]>
> > Cc: "David S. Miller" <[email protected]>
> > Cc: Neil Brown <[email protected]>
> > Cc: "J. Bruce Fields" <[email protected]>
> > Cc: [email protected]
> > Cc: [email protected]
>
> So this blocks the system call until all page references
> are gone, right?
Right. The alternative is to return to userspace while the network stack
still has a reference to the buffer which was passed in -- that's the
exact class of problem this patch is supposed to fix.
> But, there's no upper limit on how long the
> page is referenced, correct?
Correct.
> consider a bridged setup
> with an skb queued at a tap device - this cause one process
> to block another one by virtue of not consuming a cloned skb?
Hmm, yes.
One approach might be to introduce the concept of an skb timeout to the
stack as a whole and cancel (or deep copy) after that timeout occurs.
That's going to be tricky though I suspect...
A simpler option would be to have an end points such as a tap device
which can swallow skbs for arbitrary times implement a policy in this
regard, either to deep copy or drop after a timeout?
Ian.
>
> > ---
> > include/linux/sunrpc/xdr.h | 2 ++
> > include/linux/sunrpc/xprt.h | 5 ++++-
> > net/sunrpc/clnt.c | 27 ++++++++++++++++++++++-----
> > net/sunrpc/svcsock.c | 3 ++-
> > net/sunrpc/xprt.c | 13 +++++++++++++
> > net/sunrpc/xprtsock.c | 3 ++-
> > 6 files changed, 45 insertions(+), 8 deletions(-)
> >
> > diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
> > index a20970e..172f81e 100644
> > --- a/include/linux/sunrpc/xdr.h
> > +++ b/include/linux/sunrpc/xdr.h
> > @@ -16,6 +16,7 @@
> > #include <asm/byteorder.h>
> > #include <asm/unaligned.h>
> > #include <linux/scatterlist.h>
> > +#include <linux/skbuff.h>
> >
> > /*
> > * Buffer adjustment
> > @@ -57,6 +58,7 @@ struct xdr_buf {
> > tail[1]; /* Appended after page data */
> >
> > struct page ** pages; /* Array of contiguous pages */
> > + struct skb_frag_destructor *destructor;
> > unsigned int page_base, /* Start of page data */
> > page_len, /* Length of page data */
> > flags; /* Flags for data disposition */
> > diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
> > index 15518a1..75131eb 100644
> > --- a/include/linux/sunrpc/xprt.h
> > +++ b/include/linux/sunrpc/xprt.h
> > @@ -92,7 +92,10 @@ struct rpc_rqst {
> > /* A cookie used to track the
> > state of the transport
> > connection */
> > -
> > + struct skb_frag_destructor destructor; /* SKB paged fragment
> > + * destructor for
> > + * transmitted pages*/
> > +
> > /*
> > * Partial send handling
> > */
> > diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> > index c5347d2..919538d 100644
> > --- a/net/sunrpc/clnt.c
> > +++ b/net/sunrpc/clnt.c
> > @@ -61,6 +61,7 @@ static void call_reserve(struct rpc_task *task);
> > static void call_reserveresult(struct rpc_task *task);
> > static void call_allocate(struct rpc_task *task);
> > static void call_decode(struct rpc_task *task);
> > +static void call_complete(struct rpc_task *task);
> > static void call_bind(struct rpc_task *task);
> > static void call_bind_status(struct rpc_task *task);
> > static void call_transmit(struct rpc_task *task);
> > @@ -1113,6 +1114,8 @@ rpc_xdr_encode(struct rpc_task *task)
> > (char *)req->rq_buffer + req->rq_callsize,
> > req->rq_rcvsize);
> >
> > + req->rq_snd_buf.destructor = &req->destructor;
> > +
> > p = rpc_encode_header(task);
> > if (p == NULL) {
> > printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
> > @@ -1276,6 +1279,7 @@ call_connect_status(struct rpc_task *task)
> > static void
> > call_transmit(struct rpc_task *task)
> > {
> > + struct rpc_rqst *req = task->tk_rqstp;
> > dprint_status(task);
> >
> > task->tk_action = call_status;
> > @@ -1309,8 +1313,8 @@ call_transmit(struct rpc_task *task)
> > call_transmit_status(task);
> > if (rpc_reply_expected(task))
> > return;
> > - task->tk_action = rpc_exit_task;
> > - rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
> > + task->tk_action = call_complete;
> > + skb_frag_destructor_unref(&req->destructor);
> > }
> >
> > /*
> > @@ -1383,7 +1387,8 @@ call_bc_transmit(struct rpc_task *task)
> > return;
> > }
> >
> > - task->tk_action = rpc_exit_task;
> > + task->tk_action = call_complete;
> > + skb_frag_destructor_unref(&req->destructor);
> > if (task->tk_status < 0) {
> > printk(KERN_NOTICE "RPC: Could not send backchannel reply "
> > "error: %d\n", task->tk_status);
> > @@ -1423,7 +1428,6 @@ call_bc_transmit(struct rpc_task *task)
> > "error: %d\n", task->tk_status);
> > break;
> > }
> > - rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
> > }
> > #endif /* CONFIG_SUNRPC_BACKCHANNEL */
> >
> > @@ -1589,12 +1593,14 @@ call_decode(struct rpc_task *task)
> > return;
> > }
> >
> > - task->tk_action = rpc_exit_task;
> > + task->tk_action = call_complete;
> >
> > if (decode) {
> > task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
> > task->tk_msg.rpc_resp);
> > }
> > + rpc_sleep_on(&req->rq_xprt->pending, task, NULL);
> > + skb_frag_destructor_unref(&req->destructor);
> > dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
> > task->tk_status);
> > return;
> > @@ -1609,6 +1615,17 @@ out_retry:
> > }
> > }
> >
> > +/*
> > + * 8. Wait for pages to be released by the network stack.
> > + */
> > +static void
> > +call_complete(struct rpc_task *task)
> > +{
> > + dprintk("RPC: %5u call_complete result %d\n",
> > + task->tk_pid, task->tk_status);
> > + task->tk_action = rpc_exit_task;
> > +}
> > +
> > static __be32 *
> > rpc_encode_header(struct rpc_task *task)
> > {
> > diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
> > index 852a258..3685cad 100644
> > --- a/net/sunrpc/svcsock.c
> > +++ b/net/sunrpc/svcsock.c
> > @@ -196,7 +196,8 @@ int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
> > while (pglen > 0) {
> > if (slen == size)
> > flags = 0;
> > - result = kernel_sendpage(sock, *ppage, NULL, base, size, flags);
> > + result = kernel_sendpage(sock, *ppage, xdr->destructor,
> > + base, size, flags);
> > if (result > 0)
> > len += result;
> > if (result != size)
> > diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> > index f4385e4..925aa0c 100644
> > --- a/net/sunrpc/xprt.c
> > +++ b/net/sunrpc/xprt.c
> > @@ -1103,6 +1103,16 @@ static inline void xprt_init_xid(struct rpc_xprt *xprt)
> > xprt->xid = net_random();
> > }
> >
> > +static int xprt_complete_skb_pages(void *calldata)
> > +{
> > + struct rpc_task *task = calldata;
> > + struct rpc_rqst *req = task->tk_rqstp;
> > +
> > + dprintk("RPC: %5u completing skb pages\n", task->tk_pid);
> > + rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
> > + return 0;
> > +}
> > +
> > static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
> > {
> > struct rpc_rqst *req = task->tk_rqstp;
> > @@ -1115,6 +1125,9 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
> > req->rq_xid = xprt_alloc_xid(xprt);
> > req->rq_release_snd_buf = NULL;
> > xprt_reset_majortimeo(req);
> > + atomic_set(&req->destructor.ref, 1);
> > + req->destructor.destroy = &xprt_complete_skb_pages;
> > + req->destructor.data = task;
> > dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
> > req, ntohl(req->rq_xid));
> > }
> > diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> > index f79e40e9..af3a106 100644
> > --- a/net/sunrpc/xprtsock.c
> > +++ b/net/sunrpc/xprtsock.c
> > @@ -408,7 +408,8 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
> > remainder -= len;
> > if (remainder != 0 || more)
> > flags |= MSG_MORE;
> > - err = sock->ops->sendpage(sock, *ppage, NULL, base, len, flags);
> > + err = sock->ops->sendpage(sock, *ppage, xdr->destructor,
> > + base, len, flags);
> > if (remainder == 0 || err != len)
> > break;
> > sent += err;
> > --
> > 1.7.2.5
> >
> > --
> > To unsubscribe from this list: send the line "unsubscribe netdev" in
> > the body of a message to [email protected]
> > More majordomo info at http://vger.kernel.org/majordomo-info.html
On Fri, Nov 11, 2011 at 01:20:27PM +0000, Ian Campbell wrote:
> On Fri, 2011-11-11 at 12:38 +0000, Michael S. Tsirkin wrote:
> > On Wed, Nov 09, 2011 at 03:02:07PM +0000, Ian Campbell wrote:
> > > This prevents an issue where an ACK is delayed, a retransmit is queued (either
> > > at the RPC or TCP level) and the ACK arrives before the retransmission hits the
> > > wire. If this happens to an NFS WRITE RPC then the write() system call
> > > completes and the userspace process can continue, potentially modifying data
> > > referenced by the retransmission before the retransmission occurs.
> > >
> > > Signed-off-by: Ian Campbell <[email protected]>
> > > Acked-by: Trond Myklebust <[email protected]>
> > > Cc: "David S. Miller" <[email protected]>
> > > Cc: Neil Brown <[email protected]>
> > > Cc: "J. Bruce Fields" <[email protected]>
> > > Cc: [email protected]
> > > Cc: [email protected]
> >
> > So this blocks the system call until all page references
> > are gone, right?
>
> Right. The alternative is to return to userspace while the network stack
> still has a reference to the buffer which was passed in -- that's the
> exact class of problem this patch is supposed to fix.
>
> > But, there's no upper limit on how long the
> > page is referenced, correct?
>
> Correct.
>
> > consider a bridged setup
> > with an skb queued at a tap device - this cause one process
> > to block another one by virtue of not consuming a cloned skb?
>
> Hmm, yes.
>
> One approach might be to introduce the concept of an skb timeout to the
> stack as a whole and cancel (or deep copy) after that timeout occurs.
> That's going to be tricky though I suspect...
>
> A simpler option would be to have an end points such as a tap device
> which can swallow skbs for arbitrary times implement a policy in this
> regard, either to deep copy or drop after a timeout?
Stupid question: Is it a requirement that you be safe against DOS by a
rogue process with a tap device? (And if so, does current code satisfy
that requirement?)
--b.
On Fri, Nov 11, 2011 at 01:20:27PM +0000, Ian Campbell wrote:
> On Fri, 2011-11-11 at 12:38 +0000, Michael S. Tsirkin wrote:
> > On Wed, Nov 09, 2011 at 03:02:07PM +0000, Ian Campbell wrote:
> > > This prevents an issue where an ACK is delayed, a retransmit is queued (either
> > > at the RPC or TCP level) and the ACK arrives before the retransmission hits the
> > > wire. If this happens to an NFS WRITE RPC then the write() system call
> > > completes and the userspace process can continue, potentially modifying data
> > > referenced by the retransmission before the retransmission occurs.
> > >
> > > Signed-off-by: Ian Campbell <[email protected]>
> > > Acked-by: Trond Myklebust <[email protected]>
> > > Cc: "David S. Miller" <[email protected]>
> > > Cc: Neil Brown <[email protected]>
> > > Cc: "J. Bruce Fields" <[email protected]>
> > > Cc: [email protected]
> > > Cc: [email protected]
> >
> > So this blocks the system call until all page references
> > are gone, right?
>
> Right. The alternative is to return to userspace while the network stack
> still has a reference to the buffer which was passed in -- that's the
> exact class of problem this patch is supposed to fix.
BTW, the increased latency and the overhead extra wakeups might for some
workloads be greater than the cost of the data copy.
> > But, there's no upper limit on how long the
> > page is referenced, correct?
>
> Correct.
>
> > consider a bridged setup
> > with an skb queued at a tap device - this cause one process
> > to block another one by virtue of not consuming a cloned skb?
>
> Hmm, yes.
>
> One approach might be to introduce the concept of an skb timeout to the
> stack as a whole and cancel (or deep copy) after that timeout occurs.
> That's going to be tricky though I suspect...
Further, an application might use signals such as SIGALARM,
delaying them significantly will cause trouble.
> A simpler option would be to have an end points such as a tap device
Which end points would that be? Doesn't this affect a packet socket
with matching filters? A userspace TCP socket that happens to
reside on the same box? It also seems that at least with a tap device
an skb can get queued in a qdisk, also indefinitely, right?
> which can swallow skbs for arbitrary times implement a policy in this
> regard, either to deep copy or drop after a timeout?
>
> Ian.
Or deep copy immediately?
--
MST