2018-09-03 19:50:39

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 00/27] Convert RPC client transmission to a queued model

For historical reasons, the RPC client is heavily serialised during the
process of transmitting a request by the XPRT_LOCK. A request is
required to take that lock before it can start XDR encoding, and it is
required to hold it until it is done transmitting. In essence the lock
protects the following functions:

- Stream based transport connect/reconnect
- RPCSEC_GSS encoding of the RPC message
- Transmission of a single RPC message

The following patch set assumes that we do not need to do much to
improve performance of the connect/reconnect case, as that is supposed
to be a rare occurrence.

The set looks at dealing with RPCSEC_GSS issues by removing serialisation
while encoding, and simply assuming that if we detect after grabbing the
XPRT_LOCK that we're about to transmit a message with a sequence number
that has fallen outside the window allowed by RFC2203, then we can
abort the transmission of that message, and schedule it for re-encoding.
Since window sizes are typically expected to lie above 100 messages or
so, we expect these cases where we miss the window to be rare, in
general.

Finally, we look at trying to avoid the requirement that every request
must go through the process of being woken up to grab the XPRT_LOCK in
order to transmit itself by allowing a request that currently holds the
XPRT_LOCK to grab other requests from an ordered queue, and to transmit
them too. The bulk of the changes in this patchset are dedicated to
providing this functionality.

Trond Myklebust (27):
SUNRPC: Clean up initialisation of the struct rpc_rqst
SUNRPC: If there is no reply expected, bail early from call_decode
SUNRPC: The transmitted message must lie in the RPCSEC window of
validity
SUNRPC: Simplify identification of when the message send/receive is
complete
SUNRPC: Avoid holding locks across the XDR encoding of the RPC message
SUNRPC: Rename TCP receive-specific state variables
SUNRPC: Move reset of TCP state variables into the reconnect code
SUNRPC: Add socket transmit queue offset tracking
SUNRPC: Simplify dealing with aborted partially transmitted messages
SUNRPC: Refactor the transport request pinning
SUNRPC: Add a helper to wake up a sleeping rpc_task and set its status
SUNRPC: Don't wake queued RPC calls multiple times in xprt_transmit
SUNRPC: Rename xprt->recv_lock to xprt->queue_lock
SUNRPC: Refactor xprt_transmit() to remove the reply queue code
SUNRPC: Refactor xprt_transmit() to remove wait for reply code
SUNRPC: Minor cleanup for call_transmit()
SUNRPC: Distinguish between the slot allocation list and receive queue
NFS: Add a transmission queue for RPC requests
SUNRPC: Refactor RPC call encoding
SUNRPC: Treat the task and request as separate in the
xprt_ops->send_request()
SUNRPC: Don't reset the request 'bytes_sent' counter when releasing
XPRT_LOCK
SUNRPC: Simplify xprt_prepare_transmit()
SUNRPC: Move RPC retransmission stat counter to xprt_transmit()
SUNRPC: Fix up the back channel transmit
SUNRPC: Allow calls to xprt_transmit() to drain the entire transmit
queue
SUNRPC: Queue the request for transmission immediately after encoding
SUNRPC: Convert the xprt->sending queue back to an ordinary wait queue

include/linux/sunrpc/auth.h | 2 +
include/linux/sunrpc/auth_gss.h | 1 +
include/linux/sunrpc/sched.h | 7 +-
include/linux/sunrpc/xprt.h | 23 +-
include/linux/sunrpc/xprtsock.h | 23 +-
include/trace/events/sunrpc.h | 10 +-
net/sunrpc/auth.c | 10 +
net/sunrpc/auth_gss/auth_gss.c | 41 ++
net/sunrpc/backchannel_rqst.c | 3 +-
net/sunrpc/clnt.c | 139 +++---
net/sunrpc/sched.c | 63 ++-
net/sunrpc/svcsock.c | 6 +-
net/sunrpc/xprt.c | 503 +++++++++++++--------
net/sunrpc/xprtrdma/backchannel.c | 3 +-
net/sunrpc/xprtrdma/rpc_rdma.c | 10 +-
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 7 +-
net/sunrpc/xprtrdma/transport.c | 5 +-
net/sunrpc/xprtsock.c | 327 +++++++-------
18 files changed, 728 insertions(+), 455 deletions(-)

--
2.17.1


2018-09-03 19:50:43

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 03/27] SUNRPC: The transmitted message must lie in the RPCSEC window of validity

If a message has been encoded using RPCSEC_GSS, the server is
maintaining a window of sequence numbers that it considers valid.
The client should normally be tracking that window, and needs to
verify that the sequence number used by the message being transmitted
still lies inside the window of validity.

So far, we've been able to assume this condition would be realised
automatically, since the server has been encoding the message only
after taking the socket lock. Once we change that condition, we
will need the explicit check.

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/auth.h | 2 ++
include/linux/sunrpc/auth_gss.h | 1 +
net/sunrpc/auth.c | 10 ++++++++
net/sunrpc/auth_gss/auth_gss.c | 41 +++++++++++++++++++++++++++++++++
net/sunrpc/clnt.c | 3 +++
net/sunrpc/xprt.c | 7 ++++++
6 files changed, 64 insertions(+)

diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h
index 58a6765c1c5e..2c97a3933ef9 100644
--- a/include/linux/sunrpc/auth.h
+++ b/include/linux/sunrpc/auth.h
@@ -157,6 +157,7 @@ struct rpc_credops {
int (*crkey_timeout)(struct rpc_cred *);
bool (*crkey_to_expire)(struct rpc_cred *);
char * (*crstringify_acceptor)(struct rpc_cred *);
+ bool (*crneed_reencode)(struct rpc_task *);
};

extern const struct rpc_authops authunix_ops;
@@ -192,6 +193,7 @@ __be32 * rpcauth_marshcred(struct rpc_task *, __be32 *);
__be32 * rpcauth_checkverf(struct rpc_task *, __be32 *);
int rpcauth_wrap_req(struct rpc_task *task, kxdreproc_t encode, void *rqstp, __be32 *data, void *obj);
int rpcauth_unwrap_resp(struct rpc_task *task, kxdrdproc_t decode, void *rqstp, __be32 *data, void *obj);
+bool rpcauth_xmit_need_reencode(struct rpc_task *task);
int rpcauth_refreshcred(struct rpc_task *);
void rpcauth_invalcred(struct rpc_task *);
int rpcauth_uptodatecred(struct rpc_task *);
diff --git a/include/linux/sunrpc/auth_gss.h b/include/linux/sunrpc/auth_gss.h
index 0c9eac351aab..30427b729070 100644
--- a/include/linux/sunrpc/auth_gss.h
+++ b/include/linux/sunrpc/auth_gss.h
@@ -70,6 +70,7 @@ struct gss_cl_ctx {
refcount_t count;
enum rpc_gss_proc gc_proc;
u32 gc_seq;
+ u32 gc_seq_xmit;
spinlock_t gc_seq_lock;
struct gss_ctx *gc_gss_ctx;
struct xdr_netobj gc_wire_ctx;
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index 305ecea92170..59df5cdba0ac 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -817,6 +817,16 @@ rpcauth_unwrap_resp(struct rpc_task *task, kxdrdproc_t decode, void *rqstp,
return rpcauth_unwrap_req_decode(decode, rqstp, data, obj);
}

+bool
+rpcauth_xmit_need_reencode(struct rpc_task *task)
+{
+ struct rpc_cred *cred = task->tk_rqstp->rq_cred;
+
+ if (!cred || !cred->cr_ops->crneed_reencode)
+ return false;
+ return cred->cr_ops->crneed_reencode(task);
+}
+
int
rpcauth_refreshcred(struct rpc_task *task)
{
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 21c0aa0a0d1d..c898a7c75e84 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1984,6 +1984,46 @@ gss_unwrap_req_decode(kxdrdproc_t decode, struct rpc_rqst *rqstp,
return decode(rqstp, &xdr, obj);
}

+static bool
+gss_seq_is_newer(u32 new, u32 old)
+{
+ return (s32)(new - old) > 0;
+}
+
+static bool
+gss_xmit_need_reencode(struct rpc_task *task)
+{
+ struct rpc_rqst *req = task->tk_rqstp;
+ struct rpc_cred *cred = req->rq_cred;
+ struct gss_cl_ctx *ctx = gss_cred_get_ctx(cred);
+ u32 win, seq_xmit;
+ bool ret = true;
+
+ if (!ctx)
+ return true;
+
+ if (gss_seq_is_newer(req->rq_seqno, READ_ONCE(ctx->gc_seq)))
+ goto out;
+
+ seq_xmit = READ_ONCE(ctx->gc_seq_xmit);
+ while (gss_seq_is_newer(req->rq_seqno, seq_xmit)) {
+ u32 tmp = seq_xmit;
+
+ seq_xmit = cmpxchg(&ctx->gc_seq_xmit, tmp, req->rq_seqno);
+ if (seq_xmit == tmp) {
+ ret = false;
+ goto out;
+ }
+ }
+
+ win = ctx->gc_win;
+ if (win > 0)
+ ret = !gss_seq_is_newer(req->rq_seqno, seq_xmit - win);
+out:
+ gss_put_ctx(ctx);
+ return ret;
+}
+
static int
gss_unwrap_resp(struct rpc_task *task,
kxdrdproc_t decode, void *rqstp, __be32 *p, void *obj)
@@ -2052,6 +2092,7 @@ static const struct rpc_credops gss_credops = {
.crunwrap_resp = gss_unwrap_resp,
.crkey_timeout = gss_key_timeout,
.crstringify_acceptor = gss_stringify_acceptor,
+ .crneed_reencode = gss_xmit_need_reencode,
};

static const struct rpc_credops gss_nullops = {
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 4f1ec8013332..d41b5ac1d4e8 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -2184,6 +2184,9 @@ call_status(struct rpc_task *task)
/* shutdown or soft timeout */
rpc_exit(task, status);
break;
+ case -EBADMSG:
+ task->tk_action = call_transmit;
+ break;
default:
if (clnt->cl_chatty)
printk("%s: RPC call returned error %d\n",
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 6aa09edc9567..3973e10ea2bd 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1014,6 +1014,13 @@ void xprt_transmit(struct rpc_task *task)
dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);

if (!req->rq_reply_bytes_recvd) {
+
+ /* Verify that our message lies in the RPCSEC_GSS window */
+ if (!req->rq_bytes_sent && rpcauth_xmit_need_reencode(task)) {
+ task->tk_status = -EBADMSG;
+ return;
+ }
+
if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
/*
* Add to the list only if we're expecting a reply
--
2.17.1

2018-09-03 19:50:50

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 12/27] SUNRPC: Don't wake queued RPC calls multiple times in xprt_transmit

Rather than waking up the entire queue of RPC messages a second time,
just wake up the task that was put to sleep.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/xprt.c | 9 +++------
1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index bf8fc1a5dbd1..0441e6c8153d 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1075,13 +1075,10 @@ void xprt_transmit(struct rpc_task *task)
spin_lock(&xprt->recv_lock);
if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
rpc_sleep_on(&xprt->pending, task, xprt_timer);
- /*
- * Send an extra queue wakeup call if the
- * connection was dropped in case the call to
- * rpc_sleep_on() raced.
- */
+ /* Wake up immediately if the connection was dropped */
if (!xprt_connected(xprt))
- xprt_wake_pending_tasks(xprt, -ENOTCONN);
+ rpc_wake_up_queued_task_set_status(&xprt->pending,
+ task, -ENOTCONN);
}
spin_unlock(&xprt->recv_lock);
}
--
2.17.1

2018-09-03 19:50:53

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 13/27] SUNRPC: Rename xprt->recv_lock to xprt->queue_lock

We will use the same lock to protect both the transmit and receive queues.

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/xprt.h | 2 +-
net/sunrpc/svcsock.c | 6 ++---
net/sunrpc/xprt.c | 24 ++++++++---------
net/sunrpc/xprtrdma/rpc_rdma.c | 10 ++++----
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 4 +--
net/sunrpc/xprtsock.c | 30 +++++++++++-----------
6 files changed, 38 insertions(+), 38 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index bd743c51a865..c25d0a5fda69 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -235,7 +235,7 @@ struct rpc_xprt {
*/
spinlock_t transport_lock; /* lock transport info */
spinlock_t reserve_lock; /* lock slot table */
- spinlock_t recv_lock; /* lock receive list */
+ spinlock_t queue_lock; /* send/receive queue lock */
u32 xid; /* Next XID value to use */
struct rpc_task * snd_task; /* Task blocked in send */
struct svc_xprt *bc_xprt; /* NFSv4.1 backchannel */
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 5445145e639c..db8bb6b3a2b0 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -1004,7 +1004,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)

if (!bc_xprt)
return -EAGAIN;
- spin_lock(&bc_xprt->recv_lock);
+ spin_lock(&bc_xprt->queue_lock);
req = xprt_lookup_rqst(bc_xprt, xid);
if (!req)
goto unlock_notfound;
@@ -1022,7 +1022,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
memcpy(dst->iov_base, src->iov_base, src->iov_len);
xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len);
rqstp->rq_arg.len = 0;
- spin_unlock(&bc_xprt->recv_lock);
+ spin_unlock(&bc_xprt->queue_lock);
return 0;
unlock_notfound:
printk(KERN_NOTICE
@@ -1031,7 +1031,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
__func__, ntohl(calldir),
bc_xprt, ntohl(xid));
unlock_eagain:
- spin_unlock(&bc_xprt->recv_lock);
+ spin_unlock(&bc_xprt->queue_lock);
return -EAGAIN;
}

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 0441e6c8153d..eda305de9f77 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -826,7 +826,7 @@ static void xprt_connect_status(struct rpc_task *task)
* @xprt: transport on which the original request was transmitted
* @xid: RPC XID of incoming reply
*
- * Caller holds xprt->recv_lock.
+ * Caller holds xprt->queue_lock.
*/
struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
{
@@ -888,7 +888,7 @@ static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
* xprt_update_rtt - Update RPC RTT statistics
* @task: RPC request that recently completed
*
- * Caller holds xprt->recv_lock.
+ * Caller holds xprt->queue_lock.
*/
void xprt_update_rtt(struct rpc_task *task)
{
@@ -910,7 +910,7 @@ EXPORT_SYMBOL_GPL(xprt_update_rtt);
* @task: RPC request that recently completed
* @copied: actual number of bytes received from the transport
*
- * Caller holds xprt->recv_lock.
+ * Caller holds xprt->queue_lock.
*/
void xprt_complete_rqst(struct rpc_task *task, int copied)
{
@@ -1030,10 +1030,10 @@ void xprt_transmit(struct rpc_task *task)
memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
sizeof(req->rq_private_buf));
/* Add request to the receive list */
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
list_add_tail(&req->rq_list, &xprt->recv);
set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
xprt_reset_majortimeo(req);
/* Turn off autodisconnect */
del_singleshot_timer_sync(&xprt->timer);
@@ -1072,7 +1072,7 @@ void xprt_transmit(struct rpc_task *task)
* The spinlock ensures atomicity between the test of
* req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
*/
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
rpc_sleep_on(&xprt->pending, task, xprt_timer);
/* Wake up immediately if the connection was dropped */
@@ -1080,7 +1080,7 @@ void xprt_transmit(struct rpc_task *task)
rpc_wake_up_queued_task_set_status(&xprt->pending,
task, -ENOTCONN);
}
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
}
}

@@ -1375,16 +1375,16 @@ void xprt_release(struct rpc_task *task)
task->tk_ops->rpc_count_stats(task, task->tk_calldata);
else if (task->tk_client)
rpc_count_iostats(task, task->tk_client->cl_metrics);
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
if (!list_empty(&req->rq_list)) {
list_del_init(&req->rq_list);
if (atomic_read(&req->rq_pin)) {
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
xprt_wait_on_pinned_rqst(req);
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
}
}
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
spin_lock_bh(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task);
if (xprt->ops->release_request)
@@ -1414,7 +1414,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)

spin_lock_init(&xprt->transport_lock);
spin_lock_init(&xprt->reserve_lock);
- spin_lock_init(&xprt->recv_lock);
+ spin_lock_init(&xprt->queue_lock);

INIT_LIST_HEAD(&xprt->free);
INIT_LIST_HEAD(&xprt->recv);
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c8ae983c6cc0..0020dc401215 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1238,7 +1238,7 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
goto out_badheader;

out:
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
cwnd = xprt->cwnd;
xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
@@ -1246,7 +1246,7 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)

xprt_complete_rqst(rqst->rq_task, status);
xprt_unpin_rqst(rqst);
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
return;

/* If the incoming reply terminated a pending RPC, the next
@@ -1345,7 +1345,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
/* Match incoming rpcrdma_rep to an rpcrdma_req to
* get context for handling any incoming chunks.
*/
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
if (!rqst)
goto out_norqst;
@@ -1357,7 +1357,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
credits = buf->rb_max_requests;
buf->rb_credits = credits;

- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);

req = rpcr_to_rdmar(rqst);
req->rl_reply = rep;
@@ -1378,7 +1378,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
* is corrupt.
*/
out_norqst:
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
trace_xprtrdma_reply_rqst(rep);
goto repost;

diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index a68180090554..09b12b7568fe 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -56,7 +56,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
if (src->iov_len < 24)
goto out_shortreply;

- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
req = xprt_lookup_rqst(xprt, xid);
if (!req)
goto out_notfound;
@@ -86,7 +86,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
rcvbuf->len = 0;

out_unlock:
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
out:
return ret;

diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 3fbccebd0b10..8d6404259ff9 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -966,12 +966,12 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
return;

/* Look up and lock the request corresponding to the given XID */
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr)
goto out_unlock;
xprt_pin_rqst(rovr);
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
task = rovr->rq_task;

copied = rovr->rq_private_buf.buflen;
@@ -980,16 +980,16 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,

if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
dprintk("RPC: sk_buff copy failed\n");
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
goto out_unpin;
}

- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
xprt_complete_rqst(task, copied);
out_unpin:
xprt_unpin_rqst(rovr);
out_unlock:
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
}

static void xs_local_data_receive(struct sock_xprt *transport)
@@ -1058,13 +1058,13 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
return;

/* Look up and lock the request corresponding to the given XID */
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr)
goto out_unlock;
xprt_pin_rqst(rovr);
xprt_update_rtt(rovr->rq_task);
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
task = rovr->rq_task;

if ((copied = rovr->rq_private_buf.buflen) > repsize)
@@ -1072,7 +1072,7 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,

/* Suck it into the iovec, verify checksum if not done by hw. */
if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
__UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
goto out_unpin;
}
@@ -1081,13 +1081,13 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
spin_lock_bh(&xprt->transport_lock);
xprt_adjust_cwnd(xprt, task, copied);
spin_unlock_bh(&xprt->transport_lock);
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
xprt_complete_rqst(task, copied);
__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
out_unpin:
xprt_unpin_rqst(rovr);
out_unlock:
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
}

static void xs_udp_data_receive(struct sock_xprt *transport)
@@ -1356,24 +1356,24 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
dprintk("RPC: read reply XID %08x\n", ntohl(transport->recv.xid));

/* Find and lock the request corresponding to this xid */
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
req = xprt_lookup_rqst(xprt, transport->recv.xid);
if (!req) {
dprintk("RPC: XID %08x request not found!\n",
ntohl(transport->recv.xid));
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
return -1;
}
xprt_pin_rqst(req);
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);

xs_tcp_read_common(xprt, desc, req);

- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
xprt_complete_rqst(req->rq_task, transport->recv.copied);
xprt_unpin_rqst(req);
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
return 0;
}

--
2.17.1

2018-09-03 19:50:46

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 08/27] SUNRPC: Add socket transmit queue offset tracking

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/xprtsock.h | 7 ++++++
net/sunrpc/xprtsock.c | 40 ++++++++++++++++++---------------
2 files changed, 29 insertions(+), 18 deletions(-)

diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h
index 90d5ca8e65f4..005cfb6e7238 100644
--- a/include/linux/sunrpc/xprtsock.h
+++ b/include/linux/sunrpc/xprtsock.h
@@ -42,6 +42,13 @@ struct sock_xprt {
flags;
} recv;

+ /*
+ * State of TCP transmit queue
+ */
+ struct {
+ u32 offset;
+ } xmit;
+
/*
* Connection of transports
*/
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index ec1e3f93e707..629cc45e1e6c 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -461,7 +461,7 @@ static int xs_nospace(struct rpc_task *task)
int ret = -EAGAIN;

dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
- task->tk_pid, req->rq_slen - req->rq_bytes_sent,
+ task->tk_pid, req->rq_slen - transport->xmit.offset,
req->rq_slen);

/* Protect against races with write_space */
@@ -528,19 +528,22 @@ static int xs_local_send_request(struct rpc_task *task)
req->rq_svec->iov_base, req->rq_svec->iov_len);

req->rq_xtime = ktime_get();
- status = xs_sendpages(transport->sock, NULL, 0, xdr, req->rq_bytes_sent,
+ status = xs_sendpages(transport->sock, NULL, 0, xdr,
+ transport->xmit.offset,
true, &sent);
dprintk("RPC: %s(%u) = %d\n",
- __func__, xdr->len - req->rq_bytes_sent, status);
+ __func__, xdr->len - transport->xmit.offset, status);

if (status == -EAGAIN && sock_writeable(transport->inet))
status = -ENOBUFS;

if (likely(sent > 0) || status == 0) {
- req->rq_bytes_sent += sent;
- req->rq_xmit_bytes_sent += sent;
+ transport->xmit.offset += sent;
+ req->rq_bytes_sent = transport->xmit.offset;
if (likely(req->rq_bytes_sent >= req->rq_slen)) {
+ req->rq_xmit_bytes_sent += transport->xmit.offset;
req->rq_bytes_sent = 0;
+ transport->xmit.offset = 0;
return 0;
}
status = -EAGAIN;
@@ -592,10 +595,10 @@ static int xs_udp_send_request(struct rpc_task *task)
return -ENOTCONN;
req->rq_xtime = ktime_get();
status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
- xdr, req->rq_bytes_sent, true, &sent);
+ xdr, 0, true, &sent);

dprintk("RPC: xs_udp_send_request(%u) = %d\n",
- xdr->len - req->rq_bytes_sent, status);
+ xdr->len, status);

/* firewall is blocking us, don't return -EAGAIN or we end up looping */
if (status == -EPERM)
@@ -684,17 +687,20 @@ static int xs_tcp_send_request(struct rpc_task *task)
while (1) {
sent = 0;
status = xs_sendpages(transport->sock, NULL, 0, xdr,
- req->rq_bytes_sent, zerocopy, &sent);
+ transport->xmit.offset,
+ zerocopy, &sent);

dprintk("RPC: xs_tcp_send_request(%u) = %d\n",
- xdr->len - req->rq_bytes_sent, status);
+ xdr->len - transport->xmit.offset, status);

/* If we've sent the entire packet, immediately
* reset the count of bytes sent. */
- req->rq_bytes_sent += sent;
- req->rq_xmit_bytes_sent += sent;
+ transport->xmit.offset += sent;
+ req->rq_bytes_sent = transport->xmit.offset;
if (likely(req->rq_bytes_sent >= req->rq_slen)) {
+ req->rq_xmit_bytes_sent += transport->xmit.offset;
req->rq_bytes_sent = 0;
+ transport->xmit.offset = 0;
return 0;
}

@@ -760,18 +766,13 @@ static int xs_tcp_send_request(struct rpc_task *task)
*/
static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
- struct rpc_rqst *req;
+ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

if (task != xprt->snd_task)
return;
if (task == NULL)
goto out_release;
- req = task->tk_rqstp;
- if (req == NULL)
- goto out_release;
- if (req->rq_bytes_sent == 0)
- goto out_release;
- if (req->rq_bytes_sent == req->rq_snd_buf.len)
+ if (transport->xmit.offset == 0 || !xprt_connected(xprt))
goto out_release;
set_bit(XPRT_CLOSE_WAIT, &xprt->state);
out_release:
@@ -2021,6 +2022,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
write_unlock_bh(&sk->sk_callback_lock);
}

+ transport->xmit.offset = 0;
+
/* Tell the socket layer to start connecting... */
xprt->stat.connect_count++;
xprt->stat.connect_start = jiffies;
@@ -2384,6 +2387,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
transport->recv.len = 0;
transport->recv.copied = 0;
transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
+ transport->xmit.offset = 0;

/* Tell the socket layer to start connecting... */
xprt->stat.connect_count++;
--
2.17.1

2018-09-03 19:50:53

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 15/27] SUNRPC: Refactor xprt_transmit() to remove wait for reply code

Allow the caller in clnt.c to call into the code to wait for a reply
after calling xprt_transmit(). Again, the reason is that the backchannel
code does not need this functionality.

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/xprt.h | 1 +
net/sunrpc/clnt.c | 10 +-----
net/sunrpc/xprt.c | 72 ++++++++++++++++++++++++++-----------
3 files changed, 53 insertions(+), 30 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 0250294c904a..4fa2af087cff 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -335,6 +335,7 @@ void xprt_free_slot(struct rpc_xprt *xprt,
void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
bool xprt_prepare_transmit(struct rpc_task *task);
void xprt_request_enqueue_receive(struct rpc_task *task);
+void xprt_request_wait_receive(struct rpc_task *task);
void xprt_transmit(struct rpc_task *task);
void xprt_end_transmit(struct rpc_task *task);
int xprt_adjust_timeout(struct rpc_rqst *req);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 3d6d1b5f9e81..cf09aab11014 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1975,15 +1975,6 @@ call_transmit(struct rpc_task *task)
return;
if (is_retrans)
task->tk_client->cl_stats->rpcretrans++;
- /*
- * On success, ensure that we call xprt_end_transmit() before sleeping
- * in order to allow access to the socket to other RPC requests.
- */
- call_transmit_status(task);
- if (rpc_reply_expected(task))
- return;
- task->tk_action = rpc_exit_task;
- rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
}

/*
@@ -2000,6 +1991,7 @@ call_transmit_status(struct rpc_task *task)
*/
if (task->tk_status == 0) {
xprt_end_transmit(task);
+ xprt_request_wait_receive(task);
return;
}

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index cb3c0f7d5b3d..aefa23c90a2b 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -654,6 +654,22 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
}
EXPORT_SYMBOL_GPL(xprt_force_disconnect);

+static unsigned int
+xprt_connect_cookie(struct rpc_xprt *xprt)
+{
+ return READ_ONCE(xprt->connect_cookie);
+}
+
+static bool
+xprt_request_retransmit_after_disconnect(struct rpc_task *task)
+{
+ struct rpc_rqst *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = req->rq_xprt;
+
+ return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
+ !xprt_connected(xprt);
+}
+
/**
* xprt_conditional_disconnect - force a transport to disconnect
* @xprt: transport to disconnect
@@ -1000,6 +1016,39 @@ static void xprt_timer(struct rpc_task *task)
task->tk_status = 0;
}

+/**
+ * xprt_request_wait_receive - wait for the reply to an RPC request
+ * @task: RPC task about to send a request
+ *
+ */
+void xprt_request_wait_receive(struct rpc_task *task)
+{
+ struct rpc_rqst *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = req->rq_xprt;
+
+ if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
+ return;
+ /*
+ * Sleep on the pending queue if we're expecting a reply.
+ * The spinlock ensures atomicity between the test of
+ * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
+ */
+ spin_lock(&xprt->queue_lock);
+ if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
+ xprt->ops->set_retrans_timeout(task);
+ rpc_sleep_on(&xprt->pending, task, xprt_timer);
+ /*
+ * Send an extra queue wakeup call if the
+ * connection was dropped in case the call to
+ * rpc_sleep_on() raced.
+ */
+ if (xprt_request_retransmit_after_disconnect(task))
+ rpc_wake_up_queued_task_set_status(&xprt->pending,
+ task, -ENOTCONN);
+ }
+ spin_unlock(&xprt->queue_lock);
+}
+
/**
* xprt_prepare_transmit - reserve the transport before sending a request
* @task: RPC task about to send a request
@@ -1019,9 +1068,8 @@ bool xprt_prepare_transmit(struct rpc_task *task)
task->tk_status = req->rq_reply_bytes_recvd;
goto out_unlock;
}
- if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT)
- && xprt_connected(xprt)
- && req->rq_connect_cookie == xprt->connect_cookie) {
+ if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
+ !xprt_request_retransmit_after_disconnect(task)) {
xprt->ops->set_retrans_timeout(task);
rpc_sleep_on(&xprt->pending, task, xprt_timer);
goto out_unlock;
@@ -1082,8 +1130,6 @@ void xprt_transmit(struct rpc_task *task)
task->tk_flags |= RPC_TASK_SENT;
spin_lock_bh(&xprt->transport_lock);

- xprt->ops->set_retrans_timeout(task);
-
xprt->stat.sends++;
xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
xprt->stat.bklog_u += xprt->backlog.qlen;
@@ -1092,22 +1138,6 @@ void xprt_transmit(struct rpc_task *task)
spin_unlock_bh(&xprt->transport_lock);

req->rq_connect_cookie = connect_cookie;
- if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
- /*
- * Sleep on the pending queue if we're expecting a reply.
- * The spinlock ensures atomicity between the test of
- * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
- */
- spin_lock(&xprt->queue_lock);
- if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
- rpc_sleep_on(&xprt->pending, task, xprt_timer);
- /* Wake up immediately if the connection was dropped */
- if (!xprt_connected(xprt))
- rpc_wake_up_queued_task_set_status(&xprt->pending,
- task, -ENOTCONN);
- }
- spin_unlock(&xprt->queue_lock);
- }
}

static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
--
2.17.1

2018-09-03 19:51:04

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 26/27] SUNRPC: Queue the request for transmission immediately after encoding

Move up the call to xprt_request_enqueue_transmit() to call_encode()
so that the queue order reflects the order in which slots were
allocated.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/clnt.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 586976c4c02a..8eea3c4d2532 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1785,6 +1785,7 @@ call_encode(struct rpc_task *task)
if (rpc_reply_expected(task))
xprt_request_enqueue_receive(task);
set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+ xprt_request_enqueue_transmit(task);
out:
task->tk_action = call_bind;
}
@@ -1971,7 +1972,6 @@ call_transmit(struct rpc_task *task)
dprint_status(task);

task->tk_action = call_transmit_status;
- xprt_request_enqueue_transmit(task);
if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
return;

--
2.17.1

2018-09-03 19:50:54

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 16/27] SUNRPC: Minor cleanup for call_transmit()

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/clnt.c | 33 +++++++++++++++------------------
1 file changed, 15 insertions(+), 18 deletions(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index cf09aab11014..5fbd9875544e 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1946,9 +1946,7 @@ call_transmit(struct rpc_task *task)

dprint_status(task);

- task->tk_action = call_status;
- if (task->tk_status < 0)
- return;
+ task->tk_action = call_transmit_status;
/* Encode here so that rpcsec_gss can use correct sequence number. */
if (rpc_task_need_encode(task)) {
rpc_xdr_encode(task);
@@ -1969,9 +1967,8 @@ call_transmit(struct rpc_task *task)

if (!xprt_prepare_transmit(task))
return;
- task->tk_action = call_transmit_status;
xprt_transmit(task);
- if (task->tk_status < 0) {
+ if (task->tk_status < 0)
return;
if (is_retrans)
task->tk_client->cl_stats->rpcretrans++;
@@ -1996,19 +1993,28 @@ call_transmit_status(struct rpc_task *task)
}

switch (task->tk_status) {
- case -EAGAIN:
- case -ENOBUFS:
- break;
default:
dprint_status(task);
xprt_end_transmit(task);
break;
+ case -EBADMSG:
+ clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+ task->tk_action = call_transmit;
+ xprt_end_transmit(task);
+ break;
/*
* Special cases: if we've been waiting on the
* socket's write_space() callback, or if the
* socket just returned a connection error,
* then hold onto the transport lock.
*/
+ case -ENOBUFS:
+ rpc_delay(task, HZ>>2);
+ /* fall through */
+ case -EAGAIN:
+ task->tk_action = call_transmit;
+ task->tk_status = 0;
+ break;
case -ECONNREFUSED:
case -EHOSTDOWN:
case -ENETDOWN:
@@ -2163,22 +2169,13 @@ call_status(struct rpc_task *task)
/* fall through */
case -EPIPE:
case -ENOTCONN:
- task->tk_action = call_bind;
- break;
- case -ENOBUFS:
- rpc_delay(task, HZ>>2);
- /* fall through */
case -EAGAIN:
- task->tk_action = call_transmit;
+ task->tk_action = call_bind;
break;
case -EIO:
/* shutdown or soft timeout */
rpc_exit(task, status);
break;
- case -EBADMSG:
- clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
- task->tk_action = call_transmit;
- break;
default:
if (clnt->cl_chatty)
printk("%s: RPC call returned error %d\n",
--
2.17.1

2018-09-03 19:50:58

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request()

When we shift to using the transmit queue, then the task that holds the
write lock will not necessarily be the same as the one being transmitted.

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/xprt.h | 2 +-
net/sunrpc/xprt.c | 2 +-
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 3 +--
net/sunrpc/xprtrdma/transport.c | 5 ++--
net/sunrpc/xprtsock.c | 27 +++++++++++-----------
5 files changed, 18 insertions(+), 21 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 81a6c2c8dfc7..6d91acfe0644 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -140,7 +140,7 @@ struct rpc_xprt_ops {
void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task);
int (*buf_alloc)(struct rpc_task *task);
void (*buf_free)(struct rpc_task *task);
- int (*send_request)(struct rpc_task *task);
+ int (*send_request)(struct rpc_rqst *req, struct rpc_task *task);
void (*set_retrans_timeout)(struct rpc_task *task);
void (*timer)(struct rpc_xprt *xprt, struct rpc_task *task);
void (*release_request)(struct rpc_task *task);
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index defc6167570a..3efcb69af526 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1170,7 +1170,7 @@ void xprt_transmit(struct rpc_task *task)
}

connect_cookie = xprt->connect_cookie;
- status = xprt->ops->send_request(task);
+ status = xprt->ops->send_request(req, task);
trace_xprt_transmit(xprt, req->rq_xid, status);
if (status != 0) {
task->tk_status = status;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 09b12b7568fe..d1618c70edb4 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -215,9 +215,8 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
* connection.
*/
static int
-xprt_rdma_bc_send_request(struct rpc_task *task)
+xprt_rdma_bc_send_request(struct rpc_rqst *rqst, struct rpc_task *task)
{
- struct rpc_rqst *rqst = task->tk_rqstp;
struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
struct svcxprt_rdma *rdma;
int ret;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 143ce2579ba9..fa684bf4d090 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -706,9 +706,8 @@ xprt_rdma_free(struct rpc_task *task)
* sent. Do not try to send this message again.
*/
static int
-xprt_rdma_send_request(struct rpc_task *task)
+xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task *task)
{
- struct rpc_rqst *rqst = task->tk_rqstp;
struct rpc_xprt *xprt = rqst->rq_xprt;
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
@@ -741,7 +740,7 @@ xprt_rdma_send_request(struct rpc_task *task)
/* An RPC with no reply will throw off credit accounting,
* so drop the connection to reset the credit grant.
*/
- if (!rpc_reply_expected(task))
+ if (!rpc_reply_expected(rqst->rq_task))
goto drop_connection;
return 0;

diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 8d6404259ff9..b8143eded4af 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -449,12 +449,12 @@ static void xs_nospace_callback(struct rpc_task *task)

/**
* xs_nospace - place task on wait queue if transmit was incomplete
+ * @req: pointer to RPC request
* @task: task to put to sleep
*
*/
-static int xs_nospace(struct rpc_task *task)
+static int xs_nospace(struct rpc_rqst *req, struct rpc_task *task)
{
- struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct sock *sk = transport->inet;
@@ -513,6 +513,7 @@ static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)

/**
* xs_local_send_request - write an RPC request to an AF_LOCAL socket
+ * @req: pointer to RPC request
* @task: RPC task that manages the state of an RPC request
*
* Return values:
@@ -522,9 +523,8 @@ static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
* ENOTCONN: Caller needs to invoke connect logic then call again
* other: Some other error occured, the request was not sent
*/
-static int xs_local_send_request(struct rpc_task *task)
+static int xs_local_send_request(struct rpc_rqst *req, struct rpc_task *task)
{
- struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
struct sock_xprt *transport =
container_of(xprt, struct sock_xprt, xprt);
@@ -569,7 +569,7 @@ static int xs_local_send_request(struct rpc_task *task)
case -ENOBUFS:
break;
case -EAGAIN:
- status = xs_nospace(task);
+ status = xs_nospace(req, task);
break;
default:
dprintk("RPC: sendmsg returned unrecognized error %d\n",
@@ -585,6 +585,7 @@ static int xs_local_send_request(struct rpc_task *task)

/**
* xs_udp_send_request - write an RPC request to a UDP socket
+ * @req: pointer to RPC request
* @task: address of RPC task that manages the state of an RPC request
*
* Return values:
@@ -594,9 +595,8 @@ static int xs_local_send_request(struct rpc_task *task)
* ENOTCONN: Caller needs to invoke connect logic then call again
* other: Some other error occurred, the request was not sent
*/
-static int xs_udp_send_request(struct rpc_task *task)
+static int xs_udp_send_request(struct rpc_rqst *req, struct rpc_task *task)
{
- struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct xdr_buf *xdr = &req->rq_snd_buf;
@@ -638,7 +638,7 @@ static int xs_udp_send_request(struct rpc_task *task)
/* Should we call xs_close() here? */
break;
case -EAGAIN:
- status = xs_nospace(task);
+ status = xs_nospace(req, task);
break;
case -ENETUNREACH:
case -ENOBUFS:
@@ -658,6 +658,7 @@ static int xs_udp_send_request(struct rpc_task *task)

/**
* xs_tcp_send_request - write an RPC request to a TCP socket
+ * @req: pointer to RPC request
* @task: address of RPC task that manages the state of an RPC request
*
* Return values:
@@ -670,9 +671,8 @@ static int xs_udp_send_request(struct rpc_task *task)
* XXX: In the case of soft timeouts, should we eventually give up
* if sendmsg is not able to make progress?
*/
-static int xs_tcp_send_request(struct rpc_task *task)
+static int xs_tcp_send_request(struct rpc_rqst *req, struct rpc_task *task)
{
- struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct xdr_buf *xdr = &req->rq_snd_buf;
@@ -697,7 +697,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
* completes while the socket holds a reference to the pages,
* then we may end up resending corrupted data.
*/
- if (task->tk_flags & RPC_TASK_SENT)
+ if (req->rq_task->tk_flags & RPC_TASK_SENT)
zerocopy = false;

if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
@@ -761,7 +761,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
/* Should we call xs_close() here? */
break;
case -EAGAIN:
- status = xs_nospace(task);
+ status = xs_nospace(req, task);
break;
case -ECONNRESET:
case -ECONNREFUSED:
@@ -2706,9 +2706,8 @@ static int bc_sendto(struct rpc_rqst *req)
/*
* The send routine. Borrows from svc_send
*/
-static int bc_send_request(struct rpc_task *task)
+static int bc_send_request(struct rpc_rqst *req, struct rpc_task *task)
{
- struct rpc_rqst *req = task->tk_rqstp;
struct svc_xprt *xprt;
int len;

--
2.17.1

2018-09-03 19:50:40

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 01/27] SUNRPC: Clean up initialisation of the struct rpc_rqst

Move the initialisation back into xprt.c.

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/xprt.h | 1 -
net/sunrpc/clnt.c | 1 -
net/sunrpc/xprt.c | 91 +++++++++++++++++++++----------------
3 files changed, 51 insertions(+), 42 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 336fd1a19cca..3d80524e92d6 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -325,7 +325,6 @@ struct xprt_class {
struct rpc_xprt *xprt_create_transport(struct xprt_create *args);
void xprt_connect(struct rpc_task *task);
void xprt_reserve(struct rpc_task *task);
-void xprt_request_init(struct rpc_task *task);
void xprt_retry_reserve(struct rpc_task *task);
int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 8ea2f5fadd96..bc9d020bf71f 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1558,7 +1558,6 @@ call_reserveresult(struct rpc_task *task)
task->tk_status = 0;
if (status >= 0) {
if (task->tk_rqstp) {
- xprt_request_init(task);
task->tk_action = call_refresh;
return;
}
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index a8db2e3f8904..6aa09edc9567 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1250,6 +1250,55 @@ void xprt_free(struct rpc_xprt *xprt)
}
EXPORT_SYMBOL_GPL(xprt_free);

+static __be32
+xprt_alloc_xid(struct rpc_xprt *xprt)
+{
+ __be32 xid;
+
+ spin_lock(&xprt->reserve_lock);
+ xid = (__force __be32)xprt->xid++;
+ spin_unlock(&xprt->reserve_lock);
+ return xid;
+}
+
+static void
+xprt_init_xid(struct rpc_xprt *xprt)
+{
+ xprt->xid = prandom_u32();
+}
+
+static void
+xprt_request_init(struct rpc_task *task)
+{
+ struct rpc_xprt *xprt = task->tk_xprt;
+ struct rpc_rqst *req = task->tk_rqstp;
+
+ INIT_LIST_HEAD(&req->rq_list);
+ req->rq_timeout = task->tk_client->cl_timeout->to_initval;
+ req->rq_task = task;
+ req->rq_xprt = xprt;
+ req->rq_buffer = NULL;
+ req->rq_xid = xprt_alloc_xid(xprt);
+ req->rq_connect_cookie = xprt->connect_cookie - 1;
+ req->rq_bytes_sent = 0;
+ req->rq_snd_buf.len = 0;
+ req->rq_snd_buf.buflen = 0;
+ req->rq_rcv_buf.len = 0;
+ req->rq_rcv_buf.buflen = 0;
+ req->rq_release_snd_buf = NULL;
+ xprt_reset_majortimeo(req);
+ dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
+ req, ntohl(req->rq_xid));
+}
+
+static void
+xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+ xprt->ops->alloc_slot(xprt, task);
+ if (task->tk_rqstp != NULL)
+ xprt_request_init(task);
+}
+
/**
* xprt_reserve - allocate an RPC request slot
* @task: RPC task requesting a slot allocation
@@ -1269,7 +1318,7 @@ void xprt_reserve(struct rpc_task *task)
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
if (!xprt_throttle_congested(xprt, task))
- xprt->ops->alloc_slot(xprt, task);
+ xprt_do_reserve(xprt, task);
}

/**
@@ -1291,45 +1340,7 @@ void xprt_retry_reserve(struct rpc_task *task)

task->tk_timeout = 0;
task->tk_status = -EAGAIN;
- xprt->ops->alloc_slot(xprt, task);
-}
-
-static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
-{
- __be32 xid;
-
- spin_lock(&xprt->reserve_lock);
- xid = (__force __be32)xprt->xid++;
- spin_unlock(&xprt->reserve_lock);
- return xid;
-}
-
-static inline void xprt_init_xid(struct rpc_xprt *xprt)
-{
- xprt->xid = prandom_u32();
-}
-
-void xprt_request_init(struct rpc_task *task)
-{
- struct rpc_xprt *xprt = task->tk_xprt;
- struct rpc_rqst *req = task->tk_rqstp;
-
- INIT_LIST_HEAD(&req->rq_list);
- req->rq_timeout = task->tk_client->cl_timeout->to_initval;
- req->rq_task = task;
- req->rq_xprt = xprt;
- req->rq_buffer = NULL;
- req->rq_xid = xprt_alloc_xid(xprt);
- req->rq_connect_cookie = xprt->connect_cookie - 1;
- req->rq_bytes_sent = 0;
- req->rq_snd_buf.len = 0;
- req->rq_snd_buf.buflen = 0;
- req->rq_rcv_buf.len = 0;
- req->rq_rcv_buf.buflen = 0;
- req->rq_release_snd_buf = NULL;
- xprt_reset_majortimeo(req);
- dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
- req, ntohl(req->rq_xid));
+ xprt_do_reserve(xprt, task);
}

/**
--
2.17.1

2018-09-03 19:50:57

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 18/27] NFS: Add a transmission queue for RPC requests

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/xprt.h | 6 +++
net/sunrpc/backchannel_rqst.c | 1 +
net/sunrpc/clnt.c | 6 +--
net/sunrpc/xprt.c | 74 +++++++++++++++++++++++++++----
net/sunrpc/xprtrdma/backchannel.c | 1 +
5 files changed, 77 insertions(+), 11 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 9cec2d0811f2..81a6c2c8dfc7 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -88,6 +88,8 @@ struct rpc_rqst {
struct list_head rq_recv; /* Receive queue */
};

+ struct list_head rq_xmit; /* Send queue */
+
void *rq_buffer; /* Call XDR encode buffer */
size_t rq_callsize;
void *rq_rbuffer; /* Reply XDR decode buffer */
@@ -242,6 +244,9 @@ struct rpc_xprt {
spinlock_t queue_lock; /* send/receive queue lock */
u32 xid; /* Next XID value to use */
struct rpc_task * snd_task; /* Task blocked in send */
+
+ struct list_head xmit_queue; /* Send queue */
+
struct svc_xprt *bc_xprt; /* NFSv4.1 backchannel */
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
struct svc_serv *bc_serv; /* The RPC service which will */
@@ -339,6 +344,7 @@ void xprt_free_slot(struct rpc_xprt *xprt,
struct rpc_rqst *req);
void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
bool xprt_prepare_transmit(struct rpc_task *task);
+void xprt_request_enqueue_transmit(struct rpc_task *task);
void xprt_request_enqueue_receive(struct rpc_task *task);
void xprt_request_wait_receive(struct rpc_task *task);
void xprt_transmit(struct rpc_task *task);
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 92e9ad30ec2f..39b394b7dae3 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -92,6 +92,7 @@ struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt, gfp_t gfp_flags)

req->rq_xprt = xprt;
INIT_LIST_HEAD(&req->rq_recv);
+ INIT_LIST_HEAD(&req->rq_xmit);
INIT_LIST_HEAD(&req->rq_bc_list);

/* Preallocate one XDR receive buffer */
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 5fbd9875544e..a817f70d6192 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1759,8 +1759,6 @@ rpc_xdr_encode(struct rpc_task *task)

task->tk_status = rpcauth_wrap_req(task, encode, req, p,
task->tk_msg.rpc_argp);
- if (task->tk_status == 0)
- set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
}

/*
@@ -1959,11 +1957,13 @@ call_transmit(struct rpc_task *task)
rpc_exit(task, task->tk_status);
return;
}
+ set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
}

/* Add task to reply queue before transmission to avoid races */
if (rpc_reply_expected(task))
xprt_request_enqueue_receive(task);
+ xprt_request_enqueue_transmit(task);

if (!xprt_prepare_transmit(task))
return;
@@ -1998,7 +1998,6 @@ call_transmit_status(struct rpc_task *task)
xprt_end_transmit(task);
break;
case -EBADMSG:
- clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
task->tk_action = call_transmit;
xprt_end_transmit(task);
break;
@@ -2049,6 +2048,7 @@ call_bc_transmit(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;

+ xprt_request_enqueue_transmit(task);
if (!xprt_prepare_transmit(task))
goto out_retry;

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 0184a6aed7e3..defc6167570a 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1049,6 +1049,64 @@ void xprt_request_wait_receive(struct rpc_task *task)
spin_unlock(&xprt->queue_lock);
}

+static bool
+xprt_request_need_transmit(struct rpc_task *task)
+{
+ return !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
+ xprt_request_retransmit_after_disconnect(task);
+}
+
+/**
+ * xprt_request_enqueue_transmit - queue a task for transmission
+ * @task: pointer to rpc_task
+ *
+ * Add a task to the transmission queue.
+ */
+void
+xprt_request_enqueue_transmit(struct rpc_task *task)
+{
+ struct rpc_rqst *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = req->rq_xprt;
+
+ spin_lock(&xprt->queue_lock);
+ if (list_empty(&req->rq_xmit) && xprt_request_need_transmit(task) &&
+ test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+ list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
+ spin_unlock(&xprt->queue_lock);
+}
+
+/**
+ * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
+ * @task: pointer to rpc_task
+ *
+ * Remove a task from the transmission queue
+ * Caller must hold xprt->queue_lock
+ */
+static void
+xprt_request_dequeue_transmit_locked(struct rpc_task *task)
+{
+ xprt_task_clear_bytes_sent(task);
+ clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+ list_del_init(&task->tk_rqstp->rq_xmit);
+}
+
+/**
+ * xprt_request_dequeue_transmit - remove a task from the transmission queue
+ * @task: pointer to rpc_task
+ *
+ * Remove a task from the transmission queue
+ */
+static void
+xprt_request_dequeue_transmit(struct rpc_task *task)
+{
+ struct rpc_rqst *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = req->rq_xprt;
+
+ spin_lock(&xprt->queue_lock);
+ xprt_request_dequeue_transmit_locked(task);
+ spin_unlock(&xprt->queue_lock);
+}
+
/**
* xprt_prepare_transmit - reserve the transport before sending a request
* @task: RPC task about to send a request
@@ -1068,12 +1126,8 @@ bool xprt_prepare_transmit(struct rpc_task *task)
task->tk_status = req->rq_reply_bytes_recvd;
goto out_unlock;
}
- if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
- !xprt_request_retransmit_after_disconnect(task)) {
- xprt->ops->set_retrans_timeout(task);
- rpc_sleep_on(&xprt->pending, task, xprt_timer);
+ if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
goto out_unlock;
- }
}
if (!xprt->ops->reserve_xprt(xprt, task)) {
task->tk_status = -EAGAIN;
@@ -1107,11 +1161,11 @@ void xprt_transmit(struct rpc_task *task)

if (!req->rq_bytes_sent) {
if (xprt_request_data_received(task))
- return;
+ goto out_dequeue;
/* Verify that our message lies in the RPCSEC_GSS window */
if (rpcauth_xmit_need_reencode(task)) {
task->tk_status = -EBADMSG;
- return;
+ goto out_dequeue;
}
}

@@ -1126,7 +1180,6 @@ void xprt_transmit(struct rpc_task *task)
xprt_inject_disconnect(xprt);

dprintk("RPC: %5u xmit complete\n", task->tk_pid);
- clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
task->tk_flags |= RPC_TASK_SENT;
spin_lock_bh(&xprt->transport_lock);

@@ -1138,6 +1191,8 @@ void xprt_transmit(struct rpc_task *task)
spin_unlock_bh(&xprt->transport_lock);

req->rq_connect_cookie = connect_cookie;
+out_dequeue:
+ xprt_request_dequeue_transmit(task);
}

static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
@@ -1338,6 +1393,7 @@ xprt_request_init(struct rpc_task *task)
struct rpc_rqst *req = task->tk_rqstp;

INIT_LIST_HEAD(&req->rq_recv);
+ INIT_LIST_HEAD(&req->rq_xmit);
req->rq_timeout = task->tk_client->cl_timeout->to_initval;
req->rq_task = task;
req->rq_xprt = xprt;
@@ -1433,6 +1489,7 @@ void xprt_release(struct rpc_task *task)
rpc_count_iostats(task, task->tk_client->cl_metrics);
spin_lock(&xprt->queue_lock);
xprt_request_dequeue_receive_locked(task);
+ xprt_request_dequeue_transmit_locked(task);
while (xprt_is_pinned_rqst(req)) {
spin_unlock(&xprt->queue_lock);
xprt_wait_on_pinned_rqst(req);
@@ -1472,6 +1529,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)

INIT_LIST_HEAD(&xprt->free);
INIT_LIST_HEAD(&xprt->recv_queue);
+ INIT_LIST_HEAD(&xprt->xmit_queue);
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
spin_lock_init(&xprt->bc_pa_lock);
INIT_LIST_HEAD(&xprt->bc_pa_list);
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 40c7c7306a99..fc01fdabbbce 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -52,6 +52,7 @@ static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,

rqst->rq_xprt = xprt;
INIT_LIST_HEAD(&rqst->rq_recv);
+ INIT_LIST_HEAD(&rqst->rq_xmit);
INIT_LIST_HEAD(&rqst->rq_bc_list);
__set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
spin_lock_bh(&xprt->bc_pa_lock);
--
2.17.1

2018-09-03 19:50:45

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 07/27] SUNRPC: Move reset of TCP state variables into the reconnect code

Rather than resetting state variables in socket state_change() callback,
do it in the sunrpc TCP connect function itself.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/xprtsock.c | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)

diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index cd7d093721ae..ec1e3f93e707 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1600,13 +1600,6 @@ static void xs_tcp_state_change(struct sock *sk)
case TCP_ESTABLISHED:
spin_lock(&xprt->transport_lock);
if (!xprt_test_and_set_connected(xprt)) {
-
- /* Reset TCP record info */
- transport->recv.offset = 0;
- transport->recv.len = 0;
- transport->recv.copied = 0;
- transport->recv.flags =
- TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
xprt->connect_cookie++;
clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
xprt_clear_connecting(xprt);
@@ -2386,6 +2379,12 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)

xs_set_memalloc(xprt);

+ /* Reset TCP record info */
+ transport->recv.offset = 0;
+ transport->recv.len = 0;
+ transport->recv.copied = 0;
+ transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
+
/* Tell the socket layer to start connecting... */
xprt->stat.connect_count++;
xprt->stat.connect_start = jiffies;
--
2.17.1

2018-09-03 19:50:58

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 19/27] SUNRPC: Refactor RPC call encoding

Move the call encoding so that it occurs before the transport connection
etc.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/clnt.c | 67 +++++++++++++++++++++++++++--------------------
1 file changed, 39 insertions(+), 28 deletions(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index a817f70d6192..497a30762a6d 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -61,6 +61,7 @@ static void call_start(struct rpc_task *task);
static void call_reserve(struct rpc_task *task);
static void call_reserveresult(struct rpc_task *task);
static void call_allocate(struct rpc_task *task);
+static void call_encode(struct rpc_task *task);
static void call_decode(struct rpc_task *task);
static void call_bind(struct rpc_task *task);
static void call_bind_status(struct rpc_task *task);
@@ -1680,7 +1681,7 @@ call_allocate(struct rpc_task *task)
dprint_status(task);

task->tk_status = 0;
- task->tk_action = call_bind;
+ task->tk_action = call_encode;

if (req->rq_buffer)
return;
@@ -1727,9 +1728,6 @@ rpc_task_need_encode(struct rpc_task *task)
return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0;
}

-/*
- * 3. Encode arguments of an RPC call
- */
static void
rpc_xdr_encode(struct rpc_task *task)
{
@@ -1745,6 +1743,7 @@ rpc_xdr_encode(struct rpc_task *task)
xdr_buf_init(&req->rq_rcv_buf,
req->rq_rbuffer,
req->rq_rcvsize);
+ req->rq_bytes_sent = 0;

p = rpc_encode_header(task);
if (p == NULL) {
@@ -1761,6 +1760,34 @@ rpc_xdr_encode(struct rpc_task *task)
task->tk_msg.rpc_argp);
}

+/*
+ * 3. Encode arguments of an RPC call
+ */
+static void
+call_encode(struct rpc_task *task)
+{
+ if (!rpc_task_need_encode(task))
+ goto out;
+ /* Encode here so that rpcsec_gss can use correct sequence number. */
+ rpc_xdr_encode(task);
+ /* Did the encode result in an error condition? */
+ if (task->tk_status != 0) {
+ /* Was the error nonfatal? */
+ if (task->tk_status == -EAGAIN)
+ rpc_delay(task, HZ >> 4);
+ else
+ rpc_exit(task, task->tk_status);
+ return;
+ }
+
+ /* Add task to reply queue before transmission to avoid races */
+ if (rpc_reply_expected(task))
+ xprt_request_enqueue_receive(task);
+ set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+out:
+ task->tk_action = call_bind;
+}
+
/*
* 4. Get the server port number if not yet set
*/
@@ -1945,25 +1972,9 @@ call_transmit(struct rpc_task *task)
dprint_status(task);

task->tk_action = call_transmit_status;
- /* Encode here so that rpcsec_gss can use correct sequence number. */
- if (rpc_task_need_encode(task)) {
- rpc_xdr_encode(task);
- /* Did the encode result in an error condition? */
- if (task->tk_status != 0) {
- /* Was the error nonfatal? */
- if (task->tk_status == -EAGAIN)
- rpc_delay(task, HZ >> 4);
- else
- rpc_exit(task, task->tk_status);
- return;
- }
- set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
- }
-
- /* Add task to reply queue before transmission to avoid races */
- if (rpc_reply_expected(task))
- xprt_request_enqueue_receive(task);
xprt_request_enqueue_transmit(task);
+ if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+ return;

if (!xprt_prepare_transmit(task))
return;
@@ -1998,8 +2009,8 @@ call_transmit_status(struct rpc_task *task)
xprt_end_transmit(task);
break;
case -EBADMSG:
- task->tk_action = call_transmit;
xprt_end_transmit(task);
+ task->tk_action = call_encode;
break;
/*
* Special cases: if we've been waiting on the
@@ -2170,7 +2181,7 @@ call_status(struct rpc_task *task)
case -EPIPE:
case -ENOTCONN:
case -EAGAIN:
- task->tk_action = call_bind;
+ task->tk_action = call_encode;
break;
case -EIO:
/* shutdown or soft timeout */
@@ -2235,7 +2246,7 @@ call_timeout(struct rpc_task *task)
rpcauth_invalcred(task);

retry:
- task->tk_action = call_bind;
+ task->tk_action = call_encode;
task->tk_status = 0;
}

@@ -2279,7 +2290,7 @@ call_decode(struct rpc_task *task)

if (req->rq_rcv_buf.len < 12) {
if (!RPC_IS_SOFT(task)) {
- task->tk_action = call_bind;
+ task->tk_action = call_encode;
goto out_retry;
}
dprintk("RPC: %s: too small RPC reply size (%d bytes)\n",
@@ -2410,7 +2421,7 @@ rpc_verify_header(struct rpc_task *task)
task->tk_garb_retry--;
dprintk("RPC: %5u %s: retry garbled creds\n",
task->tk_pid, __func__);
- task->tk_action = call_bind;
+ task->tk_action = call_encode;
goto out_retry;
case RPC_AUTH_TOOWEAK:
printk(KERN_NOTICE "RPC: server %s requires stronger "
@@ -2479,7 +2490,7 @@ rpc_verify_header(struct rpc_task *task)
task->tk_garb_retry--;
dprintk("RPC: %5u %s: retrying\n",
task->tk_pid, __func__);
- task->tk_action = call_bind;
+ task->tk_action = call_encode;
out_retry:
return ERR_PTR(-EAGAIN);
}
--
2.17.1

2018-09-03 19:50:41

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 02/27] SUNRPC: If there is no reply expected, bail early from call_decode

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/clnt.c | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index bc9d020bf71f..4f1ec8013332 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -2260,6 +2260,11 @@ call_decode(struct rpc_task *task)

dprint_status(task);

+ if (!decode) {
+ task->tk_action = rpc_exit_task;
+ return;
+ }
+
if (task->tk_flags & RPC_CALL_MAJORSEEN) {
if (clnt->cl_chatty) {
printk(KERN_NOTICE "%s: server %s OK\n",
@@ -2297,13 +2302,11 @@ call_decode(struct rpc_task *task)
goto out_retry;
return;
}
-
task->tk_action = rpc_exit_task;

- if (decode) {
- task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
- task->tk_msg.rpc_resp);
- }
+ task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
+ task->tk_msg.rpc_resp);
+
dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
task->tk_status);
return;
--
2.17.1

2018-09-03 19:50:45

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 06/27] SUNRPC: Rename TCP receive-specific state variables

Since we will want to introduce similar TCP state variables for the
transmission of requests, let's rename the existing ones to label
that they are for the receive side.

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/xprtsock.h | 16 +--
include/trace/events/sunrpc.h | 10 +-
net/sunrpc/xprtsock.c | 178 ++++++++++++++++----------------
3 files changed, 103 insertions(+), 101 deletions(-)

diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h
index ae0f99b9b965..90d5ca8e65f4 100644
--- a/include/linux/sunrpc/xprtsock.h
+++ b/include/linux/sunrpc/xprtsock.h
@@ -30,15 +30,17 @@ struct sock_xprt {
/*
* State of TCP reply receive
*/
- __be32 tcp_fraghdr,
- tcp_xid,
- tcp_calldir;
+ struct {
+ __be32 fraghdr,
+ xid,
+ calldir;

- u32 tcp_offset,
- tcp_reclen;
+ u32 offset,
+ len;

- unsigned long tcp_copied,
- tcp_flags;
+ unsigned long copied,
+ flags;
+ } recv;

/*
* Connection of transports
diff --git a/include/trace/events/sunrpc.h b/include/trace/events/sunrpc.h
index bbb08a3ef5cc..0aa347194e0f 100644
--- a/include/trace/events/sunrpc.h
+++ b/include/trace/events/sunrpc.h
@@ -525,11 +525,11 @@ TRACE_EVENT(xs_tcp_data_recv,
TP_fast_assign(
__assign_str(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR]);
__assign_str(port, xs->xprt.address_strings[RPC_DISPLAY_PORT]);
- __entry->xid = be32_to_cpu(xs->tcp_xid);
- __entry->flags = xs->tcp_flags;
- __entry->copied = xs->tcp_copied;
- __entry->reclen = xs->tcp_reclen;
- __entry->offset = xs->tcp_offset;
+ __entry->xid = be32_to_cpu(xs->recv.xid);
+ __entry->flags = xs->recv.flags;
+ __entry->copied = xs->recv.copied;
+ __entry->reclen = xs->recv.len;
+ __entry->offset = xs->recv.offset;
),

TP_printk("peer=[%s]:%s xid=0x%08x flags=%s copied=%lu reclen=%u offset=%lu",
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 6b7539c0466e..cd7d093721ae 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1169,42 +1169,42 @@ static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_rea
size_t len, used;
char *p;

- p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
- len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
+ p = ((char *) &transport->recv.fraghdr) + transport->recv.offset;
+ len = sizeof(transport->recv.fraghdr) - transport->recv.offset;
used = xdr_skb_read_bits(desc, p, len);
- transport->tcp_offset += used;
+ transport->recv.offset += used;
if (used != len)
return;

- transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
- if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
- transport->tcp_flags |= TCP_RCV_LAST_FRAG;
+ transport->recv.len = ntohl(transport->recv.fraghdr);
+ if (transport->recv.len & RPC_LAST_STREAM_FRAGMENT)
+ transport->recv.flags |= TCP_RCV_LAST_FRAG;
else
- transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
- transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
+ transport->recv.flags &= ~TCP_RCV_LAST_FRAG;
+ transport->recv.len &= RPC_FRAGMENT_SIZE_MASK;

- transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
- transport->tcp_offset = 0;
+ transport->recv.flags &= ~TCP_RCV_COPY_FRAGHDR;
+ transport->recv.offset = 0;

/* Sanity check of the record length */
- if (unlikely(transport->tcp_reclen < 8)) {
+ if (unlikely(transport->recv.len < 8)) {
dprintk("RPC: invalid TCP record fragment length\n");
xs_tcp_force_close(xprt);
return;
}
dprintk("RPC: reading TCP record fragment of length %d\n",
- transport->tcp_reclen);
+ transport->recv.len);
}

static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
{
- if (transport->tcp_offset == transport->tcp_reclen) {
- transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
- transport->tcp_offset = 0;
- if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
- transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
- transport->tcp_flags |= TCP_RCV_COPY_XID;
- transport->tcp_copied = 0;
+ if (transport->recv.offset == transport->recv.len) {
+ transport->recv.flags |= TCP_RCV_COPY_FRAGHDR;
+ transport->recv.offset = 0;
+ if (transport->recv.flags & TCP_RCV_LAST_FRAG) {
+ transport->recv.flags &= ~TCP_RCV_COPY_DATA;
+ transport->recv.flags |= TCP_RCV_COPY_XID;
+ transport->recv.copied = 0;
}
}
}
@@ -1214,20 +1214,20 @@ static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_r
size_t len, used;
char *p;

- len = sizeof(transport->tcp_xid) - transport->tcp_offset;
+ len = sizeof(transport->recv.xid) - transport->recv.offset;
dprintk("RPC: reading XID (%zu bytes)\n", len);
- p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
+ p = ((char *) &transport->recv.xid) + transport->recv.offset;
used = xdr_skb_read_bits(desc, p, len);
- transport->tcp_offset += used;
+ transport->recv.offset += used;
if (used != len)
return;
- transport->tcp_flags &= ~TCP_RCV_COPY_XID;
- transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
- transport->tcp_copied = 4;
+ transport->recv.flags &= ~TCP_RCV_COPY_XID;
+ transport->recv.flags |= TCP_RCV_READ_CALLDIR;
+ transport->recv.copied = 4;
dprintk("RPC: reading %s XID %08x\n",
- (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
+ (transport->recv.flags & TCP_RPC_REPLY) ? "reply for"
: "request with",
- ntohl(transport->tcp_xid));
+ ntohl(transport->recv.xid));
xs_tcp_check_fraghdr(transport);
}

@@ -1239,34 +1239,34 @@ static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
char *p;

/*
- * We want transport->tcp_offset to be 8 at the end of this routine
+ * We want transport->recv.offset to be 8 at the end of this routine
* (4 bytes for the xid and 4 bytes for the call/reply flag).
* When this function is called for the first time,
- * transport->tcp_offset is 4 (after having already read the xid).
+ * transport->recv.offset is 4 (after having already read the xid).
*/
- offset = transport->tcp_offset - sizeof(transport->tcp_xid);
- len = sizeof(transport->tcp_calldir) - offset;
+ offset = transport->recv.offset - sizeof(transport->recv.xid);
+ len = sizeof(transport->recv.calldir) - offset;
dprintk("RPC: reading CALL/REPLY flag (%zu bytes)\n", len);
- p = ((char *) &transport->tcp_calldir) + offset;
+ p = ((char *) &transport->recv.calldir) + offset;
used = xdr_skb_read_bits(desc, p, len);
- transport->tcp_offset += used;
+ transport->recv.offset += used;
if (used != len)
return;
- transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
+ transport->recv.flags &= ~TCP_RCV_READ_CALLDIR;
/*
* We don't yet have the XDR buffer, so we will write the calldir
* out after we get the buffer from the 'struct rpc_rqst'
*/
- switch (ntohl(transport->tcp_calldir)) {
+ switch (ntohl(transport->recv.calldir)) {
case RPC_REPLY:
- transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
- transport->tcp_flags |= TCP_RCV_COPY_DATA;
- transport->tcp_flags |= TCP_RPC_REPLY;
+ transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
+ transport->recv.flags |= TCP_RCV_COPY_DATA;
+ transport->recv.flags |= TCP_RPC_REPLY;
break;
case RPC_CALL:
- transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
- transport->tcp_flags |= TCP_RCV_COPY_DATA;
- transport->tcp_flags &= ~TCP_RPC_REPLY;
+ transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
+ transport->recv.flags |= TCP_RCV_COPY_DATA;
+ transport->recv.flags &= ~TCP_RPC_REPLY;
break;
default:
dprintk("RPC: invalid request message type\n");
@@ -1287,21 +1287,21 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt,

rcvbuf = &req->rq_private_buf;

- if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
+ if (transport->recv.flags & TCP_RCV_COPY_CALLDIR) {
/*
* Save the RPC direction in the XDR buffer
*/
- memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
- &transport->tcp_calldir,
- sizeof(transport->tcp_calldir));
- transport->tcp_copied += sizeof(transport->tcp_calldir);
- transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
+ memcpy(rcvbuf->head[0].iov_base + transport->recv.copied,
+ &transport->recv.calldir,
+ sizeof(transport->recv.calldir));
+ transport->recv.copied += sizeof(transport->recv.calldir);
+ transport->recv.flags &= ~TCP_RCV_COPY_CALLDIR;
}

len = desc->count;
- if (len > transport->tcp_reclen - transport->tcp_offset)
- desc->count = transport->tcp_reclen - transport->tcp_offset;
- r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
+ if (len > transport->recv.len - transport->recv.offset)
+ desc->count = transport->recv.len - transport->recv.offset;
+ r = xdr_partial_copy_from_skb(rcvbuf, transport->recv.copied,
desc, xdr_skb_read_bits);

if (desc->count) {
@@ -1314,31 +1314,31 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
* Any remaining data from this record will
* be discarded.
*/
- transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
+ transport->recv.flags &= ~TCP_RCV_COPY_DATA;
dprintk("RPC: XID %08x truncated request\n",
- ntohl(transport->tcp_xid));
- dprintk("RPC: xprt = %p, tcp_copied = %lu, "
- "tcp_offset = %u, tcp_reclen = %u\n",
- xprt, transport->tcp_copied,
- transport->tcp_offset, transport->tcp_reclen);
+ ntohl(transport->recv.xid));
+ dprintk("RPC: xprt = %p, recv.copied = %lu, "
+ "recv.offset = %u, recv.len = %u\n",
+ xprt, transport->recv.copied,
+ transport->recv.offset, transport->recv.len);
return;
}

- transport->tcp_copied += r;
- transport->tcp_offset += r;
+ transport->recv.copied += r;
+ transport->recv.offset += r;
desc->count = len - r;

dprintk("RPC: XID %08x read %zd bytes\n",
- ntohl(transport->tcp_xid), r);
- dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
- "tcp_reclen = %u\n", xprt, transport->tcp_copied,
- transport->tcp_offset, transport->tcp_reclen);
-
- if (transport->tcp_copied == req->rq_private_buf.buflen)
- transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
- else if (transport->tcp_offset == transport->tcp_reclen) {
- if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
- transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
+ ntohl(transport->recv.xid), r);
+ dprintk("RPC: xprt = %p, recv.copied = %lu, recv.offset = %u, "
+ "recv.len = %u\n", xprt, transport->recv.copied,
+ transport->recv.offset, transport->recv.len);
+
+ if (transport->recv.copied == req->rq_private_buf.buflen)
+ transport->recv.flags &= ~TCP_RCV_COPY_DATA;
+ else if (transport->recv.offset == transport->recv.len) {
+ if (transport->recv.flags & TCP_RCV_LAST_FRAG)
+ transport->recv.flags &= ~TCP_RCV_COPY_DATA;
}
}

@@ -1353,14 +1353,14 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
container_of(xprt, struct sock_xprt, xprt);
struct rpc_rqst *req;

- dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid));
+ dprintk("RPC: read reply XID %08x\n", ntohl(transport->recv.xid));

/* Find and lock the request corresponding to this xid */
spin_lock(&xprt->recv_lock);
- req = xprt_lookup_rqst(xprt, transport->tcp_xid);
+ req = xprt_lookup_rqst(xprt, transport->recv.xid);
if (!req) {
dprintk("RPC: XID %08x request not found!\n",
- ntohl(transport->tcp_xid));
+ ntohl(transport->recv.xid));
spin_unlock(&xprt->recv_lock);
return -1;
}
@@ -1370,8 +1370,8 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
xs_tcp_read_common(xprt, desc, req);

spin_lock(&xprt->recv_lock);
- if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
- xprt_complete_rqst(req->rq_task, transport->tcp_copied);
+ if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
+ xprt_complete_rqst(req->rq_task, transport->recv.copied);
xprt_unpin_rqst(req);
spin_unlock(&xprt->recv_lock);
return 0;
@@ -1393,7 +1393,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
struct rpc_rqst *req;

/* Look up the request corresponding to the given XID */
- req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
+ req = xprt_lookup_bc_request(xprt, transport->recv.xid);
if (req == NULL) {
printk(KERN_WARNING "Callback slot table overflowed\n");
xprt_force_disconnect(xprt);
@@ -1403,8 +1403,8 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid));
xs_tcp_read_common(xprt, desc, req);

- if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
- xprt_complete_bc_request(req, transport->tcp_copied);
+ if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
+ xprt_complete_bc_request(req, transport->recv.copied);

return 0;
}
@@ -1415,7 +1415,7 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
struct sock_xprt *transport =
container_of(xprt, struct sock_xprt, xprt);

- return (transport->tcp_flags & TCP_RPC_REPLY) ?
+ return (transport->recv.flags & TCP_RPC_REPLY) ?
xs_tcp_read_reply(xprt, desc) :
xs_tcp_read_callback(xprt, desc);
}
@@ -1458,9 +1458,9 @@ static void xs_tcp_read_data(struct rpc_xprt *xprt,
else {
/*
* The transport_lock protects the request handling.
- * There's no need to hold it to update the tcp_flags.
+ * There's no need to hold it to update the recv.flags.
*/
- transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
+ transport->recv.flags &= ~TCP_RCV_COPY_DATA;
}
}

@@ -1468,12 +1468,12 @@ static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_s
{
size_t len;

- len = transport->tcp_reclen - transport->tcp_offset;
+ len = transport->recv.len - transport->recv.offset;
if (len > desc->count)
len = desc->count;
desc->count -= len;
desc->offset += len;
- transport->tcp_offset += len;
+ transport->recv.offset += len;
dprintk("RPC: discarded %zu bytes\n", len);
xs_tcp_check_fraghdr(transport);
}
@@ -1494,22 +1494,22 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
trace_xs_tcp_data_recv(transport);
/* Read in a new fragment marker if necessary */
/* Can we ever really expect to get completely empty fragments? */
- if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
+ if (transport->recv.flags & TCP_RCV_COPY_FRAGHDR) {
xs_tcp_read_fraghdr(xprt, &desc);
continue;
}
/* Read in the xid if necessary */
- if (transport->tcp_flags & TCP_RCV_COPY_XID) {
+ if (transport->recv.flags & TCP_RCV_COPY_XID) {
xs_tcp_read_xid(transport, &desc);
continue;
}
/* Read in the call/reply flag */
- if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
+ if (transport->recv.flags & TCP_RCV_READ_CALLDIR) {
xs_tcp_read_calldir(transport, &desc);
continue;
}
/* Read in the request data */
- if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
+ if (transport->recv.flags & TCP_RCV_COPY_DATA) {
xs_tcp_read_data(xprt, &desc);
continue;
}
@@ -1602,10 +1602,10 @@ static void xs_tcp_state_change(struct sock *sk)
if (!xprt_test_and_set_connected(xprt)) {

/* Reset TCP record info */
- transport->tcp_offset = 0;
- transport->tcp_reclen = 0;
- transport->tcp_copied = 0;
- transport->tcp_flags =
+ transport->recv.offset = 0;
+ transport->recv.len = 0;
+ transport->recv.copied = 0;
+ transport->recv.flags =
TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
xprt->connect_cookie++;
clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
--
2.17.1

2018-09-03 19:51:02

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 24/27] SUNRPC: Fix up the back channel transmit

Fix up the back channel code to recognise that it has already been
transmitted, so does not need to be called again.
Also ensure that we set req->rq_task.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/clnt.c | 5 +++++
1 file changed, 5 insertions(+)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index fb40d1e9f636..586976c4c02a 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1150,6 +1150,7 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
*/
task = rpc_new_task(&task_setup_data);
task->tk_rqstp = req;
+ req->rq_task = task;

/*
* Set up the xdr_buf length.
@@ -2054,6 +2055,9 @@ call_bc_transmit(struct rpc_task *task)
struct rpc_rqst *req = task->tk_rqstp;

xprt_request_enqueue_transmit(task);
+ if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+ goto out_wakeup;
+
if (!xprt_prepare_transmit(task))
goto out_retry;

@@ -2108,6 +2112,7 @@ call_bc_transmit(struct rpc_task *task)
"error: %d\n", task->tk_status);
break;
}
+out_wakeup:
rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
out_done:
task->tk_action = rpc_exit_task;
--
2.17.1

2018-09-03 19:50:49

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 10/27] SUNRPC: Refactor the transport request pinning

We are going to need to pin for both send and receive.

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/sched.h | 2 --
include/linux/sunrpc/xprt.h | 1 +
net/sunrpc/xprt.c | 37 +++++++++++++++++-------------------
3 files changed, 18 insertions(+), 22 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 9e655df70131..a4a42b3a1f03 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -142,8 +142,6 @@ struct rpc_task_setup {
#define RPC_TASK_ACTIVE 2
#define RPC_TASK_NEED_XMIT 3
#define RPC_TASK_NEED_RECV 4
-#define RPC_TASK_MSG_RECV 5
-#define RPC_TASK_MSG_RECV_WAIT 6

#define RPC_IS_RUNNING(t) test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
#define rpc_set_running(t) set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 3d80524e92d6..bd743c51a865 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -103,6 +103,7 @@ struct rpc_rqst {
/* A cookie used to track the
state of the transport
connection */
+ atomic_t rq_pin;

/*
* Partial send handling
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 45d580cd93ac..bf8fc1a5dbd1 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -847,16 +847,22 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
}
EXPORT_SYMBOL_GPL(xprt_lookup_rqst);

+static bool
+xprt_is_pinned_rqst(struct rpc_rqst *req)
+{
+ return atomic_read(&req->rq_pin) != 0;
+}
+
/**
* xprt_pin_rqst - Pin a request on the transport receive list
* @req: Request to pin
*
* Caller must ensure this is atomic with the call to xprt_lookup_rqst()
- * so should be holding the xprt transport lock.
+ * so should be holding the xprt receive lock.
*/
void xprt_pin_rqst(struct rpc_rqst *req)
{
- set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
+ atomic_inc(&req->rq_pin);
}
EXPORT_SYMBOL_GPL(xprt_pin_rqst);

@@ -864,31 +870,18 @@ EXPORT_SYMBOL_GPL(xprt_pin_rqst);
* xprt_unpin_rqst - Unpin a request on the transport receive list
* @req: Request to pin
*
- * Caller should be holding the xprt transport lock.
+ * Caller should be holding the xprt receive lock.
*/
void xprt_unpin_rqst(struct rpc_rqst *req)
{
- struct rpc_task *task = req->rq_task;
-
- clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate);
- if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
- wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
+ if (atomic_dec_and_test(&req->rq_pin))
+ wake_up_var(&req->rq_pin);
}
EXPORT_SYMBOL_GPL(xprt_unpin_rqst);

static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
-__must_hold(&req->rq_xprt->recv_lock)
{
- struct rpc_task *task = req->rq_task;
-
- if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
- spin_unlock(&req->rq_xprt->recv_lock);
- set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
- wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
- TASK_UNINTERRUPTIBLE);
- clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
- spin_lock(&req->rq_xprt->recv_lock);
- }
+ wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
}

/**
@@ -1388,7 +1381,11 @@ void xprt_release(struct rpc_task *task)
spin_lock(&xprt->recv_lock);
if (!list_empty(&req->rq_list)) {
list_del_init(&req->rq_list);
- xprt_wait_on_pinned_rqst(req);
+ if (atomic_read(&req->rq_pin)) {
+ spin_unlock(&xprt->recv_lock);
+ xprt_wait_on_pinned_rqst(req);
+ spin_lock(&xprt->recv_lock);
+ }
}
spin_unlock(&xprt->recv_lock);
spin_lock_bh(&xprt->transport_lock);
--
2.17.1

2018-09-03 19:50:44

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message

Currently, we grab the socket bit lock before we allow the message
to be XDR encoded. That significantly slows down the transmission
rate, since we serialise on a potentially blocking operation.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/clnt.c | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index e5ac35e803ad..66ec61347716 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1949,9 +1949,6 @@ call_transmit(struct rpc_task *task)
task->tk_action = call_status;
if (task->tk_status < 0)
return;
- if (!xprt_prepare_transmit(task))
- return;
- task->tk_action = call_transmit_status;
/* Encode here so that rpcsec_gss can use correct sequence number. */
if (rpc_task_need_encode(task)) {
rpc_xdr_encode(task);
@@ -1965,8 +1962,11 @@ call_transmit(struct rpc_task *task)
return;
}
}
+ if (!xprt_prepare_transmit(task))
+ return;
+ task->tk_action = call_transmit_status;
xprt_transmit(task);
- if (task->tk_status < 0)
+ if (task->tk_status < 0) {
return;
if (is_retrans)
task->tk_client->cl_stats->rpcretrans++;
--
2.17.1

2018-09-03 19:50:59

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 21/27] SUNRPC: Don't reset the request 'bytes_sent' counter when releasing XPRT_LOCK

If the request is still on the queue, this will be incorrect behaviour.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/clnt.c | 3 ---
net/sunrpc/xprt.c | 14 --------------
2 files changed, 17 deletions(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 497a30762a6d..585a82dfaf4d 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -2138,9 +2138,6 @@ call_status(struct rpc_task *task)
if (!task->tk_msg.rpc_proc->p_proc)
trace_xprt_ping(task->tk_xprt, task->tk_status);

- if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
- task->tk_status = req->rq_reply_bytes_recvd;
-
dprint_status(task);

status = task->tk_status;
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 3efcb69af526..5efa9eddf769 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -332,15 +332,6 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
xprt_clear_locked(xprt);
}

-static void xprt_task_clear_bytes_sent(struct rpc_task *task)
-{
- if (task != NULL) {
- struct rpc_rqst *req = task->tk_rqstp;
- if (req != NULL)
- req->rq_bytes_sent = 0;
- }
-}
-
/**
* xprt_release_xprt - allow other requests to use a transport
* @xprt: transport with other tasks potentially waiting
@@ -351,7 +342,6 @@ static void xprt_task_clear_bytes_sent(struct rpc_task *task)
void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
if (xprt->snd_task == task) {
- xprt_task_clear_bytes_sent(task);
xprt_clear_locked(xprt);
__xprt_lock_write_next(xprt);
}
@@ -369,7 +359,6 @@ EXPORT_SYMBOL_GPL(xprt_release_xprt);
void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
{
if (xprt->snd_task == task) {
- xprt_task_clear_bytes_sent(task);
xprt_clear_locked(xprt);
__xprt_lock_write_next_cong(xprt);
}
@@ -742,7 +731,6 @@ bool xprt_lock_connect(struct rpc_xprt *xprt,
goto out;
if (xprt->snd_task != task)
goto out;
- xprt_task_clear_bytes_sent(task);
xprt->snd_task = cookie;
ret = true;
out:
@@ -788,7 +776,6 @@ void xprt_connect(struct rpc_task *task)
xprt->ops->close(xprt);

if (!xprt_connected(xprt)) {
- task->tk_rqstp->rq_bytes_sent = 0;
task->tk_timeout = task->tk_rqstp->rq_timeout;
task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
@@ -1085,7 +1072,6 @@ xprt_request_enqueue_transmit(struct rpc_task *task)
static void
xprt_request_dequeue_transmit_locked(struct rpc_task *task)
{
- xprt_task_clear_bytes_sent(task);
clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
list_del_init(&task->tk_rqstp->rq_xmit);
}
--
2.17.1

2018-09-03 19:51:01

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit()

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/clnt.c | 6 ------
net/sunrpc/xprt.c | 4 ++++
2 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 585a82dfaf4d..fb40d1e9f636 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1967,8 +1967,6 @@ call_connect_status(struct rpc_task *task)
static void
call_transmit(struct rpc_task *task)
{
- int is_retrans = RPC_WAS_SENT(task);
-
dprint_status(task);

task->tk_action = call_transmit_status;
@@ -1979,10 +1977,6 @@ call_transmit(struct rpc_task *task)
if (!xprt_prepare_transmit(task))
return;
xprt_transmit(task);
- if (task->tk_status < 0)
- return;
- if (is_retrans)
- task->tk_client->cl_stats->rpcretrans++;
}

/*
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 95d15d4017f7..b85e2c4fa115 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1126,6 +1126,7 @@ void xprt_transmit(struct rpc_task *task)
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
unsigned int connect_cookie;
+ int is_retrans = RPC_WAS_SENT(task);
int status;

dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
@@ -1148,6 +1149,9 @@ void xprt_transmit(struct rpc_task *task)
return;
}

+ if (is_retrans)
+ task->tk_client->cl_stats->rpcretrans++;
+
xprt_inject_disconnect(xprt);

dprintk("RPC: %5u xmit complete\n", task->tk_pid);
--
2.17.1

2018-09-03 19:50:50

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 11/27] SUNRPC: Add a helper to wake up a sleeping rpc_task and set its status

Add a helper that will wake up a task that is sleeping on a specific
queue, and will set the value of task->tk_status. This is mainly
intended for use by the transport layer to notify the task of an
error condition.

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/sched.h | 3 ++
net/sunrpc/sched.c | 63 ++++++++++++++++++++++++++++++------
2 files changed, 56 insertions(+), 10 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index a4a42b3a1f03..c5bc779feb00 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -234,6 +234,9 @@ void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
struct rpc_task *task);
void rpc_wake_up_queued_task(struct rpc_wait_queue *,
struct rpc_task *);
+void rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *,
+ struct rpc_task *,
+ int);
void rpc_wake_up(struct rpc_wait_queue *);
struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 3fe5d60ab0e2..104c056daf83 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -440,14 +440,28 @@ static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
/*
* Wake up a queued task while the queue lock is being held
*/
-static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
- struct rpc_wait_queue *queue, struct rpc_task *task)
+static struct rpc_task *
+rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq,
+ struct rpc_wait_queue *queue, struct rpc_task *task,
+ bool (*action)(struct rpc_task *, void *), void *data)
{
if (RPC_IS_QUEUED(task)) {
smp_rmb();
- if (task->tk_waitqueue == queue)
- __rpc_do_wake_up_task_on_wq(wq, queue, task);
+ if (task->tk_waitqueue == queue) {
+ if (action == NULL || action(task, data)) {
+ __rpc_do_wake_up_task_on_wq(wq, queue, task);
+ return task;
+ }
+ }
}
+ return NULL;
+}
+
+static void
+rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
+ struct rpc_wait_queue *queue, struct rpc_task *task)
+{
+ rpc_wake_up_task_on_wq_queue_action_locked(wq, queue, task, NULL, NULL);
}

/*
@@ -481,6 +495,38 @@ void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task
}
EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);

+static bool rpc_task_action_set_status(struct rpc_task *task, void *status)
+{
+ task->tk_status = *(int *)status;
+ return true;
+}
+
+static void
+rpc_wake_up_task_queue_set_status_locked(struct rpc_wait_queue *queue,
+ struct rpc_task *task, int status)
+{
+ rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
+ task, rpc_task_action_set_status, &status);
+}
+
+/**
+ * rpc_wake_up_queued_task_set_status - wake up a task and set task->tk_status
+ * @queue: pointer to rpc_wait_queue
+ * @task: pointer to rpc_task
+ * @status: integer error value
+ *
+ * If @task is queued on @queue, then it is woken up, and @task->tk_status is
+ * set to the value of @status.
+ */
+void
+rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue,
+ struct rpc_task *task, int status)
+{
+ spin_lock_bh(&queue->lock);
+ rpc_wake_up_task_queue_set_status_locked(queue, task, status);
+ spin_unlock_bh(&queue->lock);
+}
+
/*
* Wake up the next task on a priority queue.
*/
@@ -553,12 +599,9 @@ struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
queue, rpc_qname(queue));
spin_lock_bh(&queue->lock);
task = __rpc_find_next_queued(queue);
- if (task != NULL) {
- if (func(task, data))
- rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
- else
- task = NULL;
- }
+ if (task != NULL)
+ task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue,
+ task, func, data);
spin_unlock_bh(&queue->lock);

return task;
--
2.17.1

2018-09-03 19:50:47

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 09/27] SUNRPC: Simplify dealing with aborted partially transmitted messages

If the previous message was only partially transmitted, we need to close
the socket in order to avoid corruption of the message stream. To do so,
we currently hijack the unlocking of the socket in order to schedule
the close.
Now that we track the message offset in the socket state, we can move
that kind of checking out of the socket lock code, which is needed to
allow messages to remain queued after dropping the socket lock.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/xprtsock.c | 51 +++++++++++++++++++++----------------------
1 file changed, 25 insertions(+), 26 deletions(-)

diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 629cc45e1e6c..3fbccebd0b10 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -491,6 +491,16 @@ static int xs_nospace(struct rpc_task *task)
return ret;
}

+/*
+ * Determine if the previous message in the stream was aborted before it
+ * could complete transmission.
+ */
+static bool
+xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
+{
+ return transport->xmit.offset != 0 && req->rq_bytes_sent == 0;
+}
+
/*
* Construct a stream transport record marker in @buf.
*/
@@ -522,6 +532,12 @@ static int xs_local_send_request(struct rpc_task *task)
int status;
int sent = 0;

+ /* Close the stream if the previous transmission was incomplete */
+ if (xs_send_request_was_aborted(transport, req)) {
+ xs_close(xprt);
+ return -ENOTCONN;
+ }
+
xs_encode_stream_record_marker(&req->rq_snd_buf);

xs_pktdump("packet data:",
@@ -665,6 +681,13 @@ static int xs_tcp_send_request(struct rpc_task *task)
int status;
int sent;

+ /* Close the stream if the previous transmission was incomplete */
+ if (xs_send_request_was_aborted(transport, req)) {
+ if (transport->sock != NULL)
+ kernel_sock_shutdown(transport->sock, SHUT_RDWR);
+ return -ENOTCONN;
+ }
+
xs_encode_stream_record_marker(&req->rq_snd_buf);

xs_pktdump("packet data:",
@@ -755,30 +778,6 @@ static int xs_tcp_send_request(struct rpc_task *task)
return status;
}

-/**
- * xs_tcp_release_xprt - clean up after a tcp transmission
- * @xprt: transport
- * @task: rpc task
- *
- * This cleans up if an error causes us to abort the transmission of a request.
- * In this case, the socket may need to be reset in order to avoid confusing
- * the server.
- */
-static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
-{
- struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-
- if (task != xprt->snd_task)
- return;
- if (task == NULL)
- goto out_release;
- if (transport->xmit.offset == 0 || !xprt_connected(xprt))
- goto out_release;
- set_bit(XPRT_CLOSE_WAIT, &xprt->state);
-out_release:
- xprt_release_xprt(xprt, task);
-}
-
static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
{
transport->old_data_ready = sk->sk_data_ready;
@@ -2764,7 +2763,7 @@ static void bc_destroy(struct rpc_xprt *xprt)

static const struct rpc_xprt_ops xs_local_ops = {
.reserve_xprt = xprt_reserve_xprt,
- .release_xprt = xs_tcp_release_xprt,
+ .release_xprt = xprt_release_xprt,
.alloc_slot = xprt_alloc_slot,
.free_slot = xprt_free_slot,
.rpcbind = xs_local_rpcbind,
@@ -2806,7 +2805,7 @@ static const struct rpc_xprt_ops xs_udp_ops = {

static const struct rpc_xprt_ops xs_tcp_ops = {
.reserve_xprt = xprt_reserve_xprt,
- .release_xprt = xs_tcp_release_xprt,
+ .release_xprt = xprt_release_xprt,
.alloc_slot = xprt_lock_and_alloc_slot,
.free_slot = xprt_free_slot,
.rpcbind = rpcb_getport_async,
--
2.17.1

2018-09-03 19:50:43

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 04/27] SUNRPC: Simplify identification of when the message send/receive is complete

Add states to indicate that the message send and receive are not yet
complete.

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/sched.h | 6 ++++--
net/sunrpc/clnt.c | 19 +++++++------------
net/sunrpc/xprt.c | 17 ++++++++++++++---
3 files changed, 25 insertions(+), 17 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 592653becd91..9e655df70131 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -140,8 +140,10 @@ struct rpc_task_setup {
#define RPC_TASK_RUNNING 0
#define RPC_TASK_QUEUED 1
#define RPC_TASK_ACTIVE 2
-#define RPC_TASK_MSG_RECV 3
-#define RPC_TASK_MSG_RECV_WAIT 4
+#define RPC_TASK_NEED_XMIT 3
+#define RPC_TASK_NEED_RECV 4
+#define RPC_TASK_MSG_RECV 5
+#define RPC_TASK_MSG_RECV_WAIT 6

#define RPC_IS_RUNNING(t) test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
#define rpc_set_running(t) set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index d41b5ac1d4e8..e5ac35e803ad 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1156,6 +1156,7 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
*/
xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
xbufp->tail[0].iov_len;
+ set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);

task->tk_action = call_bc_transmit;
atomic_inc(&task->tk_count);
@@ -1720,17 +1721,10 @@ call_allocate(struct rpc_task *task)
rpc_exit(task, -ERESTARTSYS);
}

-static inline int
+static int
rpc_task_need_encode(struct rpc_task *task)
{
- return task->tk_rqstp->rq_snd_buf.len == 0;
-}
-
-static inline void
-rpc_task_force_reencode(struct rpc_task *task)
-{
- task->tk_rqstp->rq_snd_buf.len = 0;
- task->tk_rqstp->rq_bytes_sent = 0;
+ return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0;
}

/*
@@ -1765,6 +1759,8 @@ rpc_xdr_encode(struct rpc_task *task)

task->tk_status = rpcauth_wrap_req(task, encode, req, p,
task->tk_msg.rpc_argp);
+ if (task->tk_status == 0)
+ set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
}

/*
@@ -1999,7 +1995,6 @@ call_transmit_status(struct rpc_task *task)
*/
if (task->tk_status == 0) {
xprt_end_transmit(task);
- rpc_task_force_reencode(task);
return;
}

@@ -2010,7 +2005,6 @@ call_transmit_status(struct rpc_task *task)
default:
dprint_status(task);
xprt_end_transmit(task);
- rpc_task_force_reencode(task);
break;
/*
* Special cases: if we've been waiting on the
@@ -2038,7 +2032,7 @@ call_transmit_status(struct rpc_task *task)
case -EADDRINUSE:
case -ENOTCONN:
case -EPIPE:
- rpc_task_force_reencode(task);
+ break;
}
}

@@ -2185,6 +2179,7 @@ call_status(struct rpc_task *task)
rpc_exit(task, status);
break;
case -EBADMSG:
+ clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
task->tk_action = call_transmit;
break;
default:
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 3973e10ea2bd..45d580cd93ac 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -936,10 +936,18 @@ void xprt_complete_rqst(struct rpc_task *task, int copied)
/* req->rq_reply_bytes_recvd */
smp_wmb();
req->rq_reply_bytes_recvd = copied;
+ clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
rpc_wake_up_queued_task(&xprt->pending, task);
}
EXPORT_SYMBOL_GPL(xprt_complete_rqst);

+static bool
+xprt_request_data_received(struct rpc_task *task)
+{
+ return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
+ task->tk_rqstp->rq_reply_bytes_recvd != 0;
+}
+
static void xprt_timer(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
@@ -1031,12 +1039,13 @@ void xprt_transmit(struct rpc_task *task)
/* Add request to the receive list */
spin_lock(&xprt->recv_lock);
list_add_tail(&req->rq_list, &xprt->recv);
+ set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
spin_unlock(&xprt->recv_lock);
xprt_reset_majortimeo(req);
/* Turn off autodisconnect */
del_singleshot_timer_sync(&xprt->timer);
}
- } else if (!req->rq_bytes_sent)
+ } else if (xprt_request_data_received(task) && !req->rq_bytes_sent)
return;

connect_cookie = xprt->connect_cookie;
@@ -1046,9 +1055,11 @@ void xprt_transmit(struct rpc_task *task)
task->tk_status = status;
return;
}
+
xprt_inject_disconnect(xprt);

dprintk("RPC: %5u xmit complete\n", task->tk_pid);
+ clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
task->tk_flags |= RPC_TASK_SENT;
spin_lock_bh(&xprt->transport_lock);

@@ -1062,14 +1073,14 @@ void xprt_transmit(struct rpc_task *task)
spin_unlock_bh(&xprt->transport_lock);

req->rq_connect_cookie = connect_cookie;
- if (rpc_reply_expected(task) && !READ_ONCE(req->rq_reply_bytes_recvd)) {
+ if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
/*
* Sleep on the pending queue if we're expecting a reply.
* The spinlock ensures atomicity between the test of
* req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
*/
spin_lock(&xprt->recv_lock);
- if (!req->rq_reply_bytes_recvd) {
+ if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
rpc_sleep_on(&xprt->pending, task, xprt_timer);
/*
* Send an extra queue wakeup call if the
--
2.17.1

2018-09-03 19:50:53

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 14/27] SUNRPC: Refactor xprt_transmit() to remove the reply queue code

Separate out the action of adding a request to the reply queue so that the
backchannel code can simply skip calling it altogether.

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/xprt.h | 1 +
net/sunrpc/clnt.c | 5 ++
net/sunrpc/xprt.c | 100 ++++++++++++++++++++++--------------
3 files changed, 68 insertions(+), 38 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index c25d0a5fda69..0250294c904a 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -334,6 +334,7 @@ void xprt_free_slot(struct rpc_xprt *xprt,
struct rpc_rqst *req);
void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
bool xprt_prepare_transmit(struct rpc_task *task);
+void xprt_request_enqueue_receive(struct rpc_task *task);
void xprt_transmit(struct rpc_task *task);
void xprt_end_transmit(struct rpc_task *task);
int xprt_adjust_timeout(struct rpc_rqst *req);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 66ec61347716..3d6d1b5f9e81 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1962,6 +1962,11 @@ call_transmit(struct rpc_task *task)
return;
}
}
+
+ /* Add task to reply queue before transmission to avoid races */
+ if (rpc_reply_expected(task))
+ xprt_request_enqueue_receive(task);
+
if (!xprt_prepare_transmit(task))
return;
task->tk_action = call_transmit_status;
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index eda305de9f77..cb3c0f7d5b3d 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -884,6 +884,57 @@ static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
}

+static bool
+xprt_request_data_received(struct rpc_task *task)
+{
+ return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
+ task->tk_rqstp->rq_reply_bytes_recvd != 0;
+}
+
+/**
+ * xprt_request_enqueue_receive - Add an request to the receive queue
+ * @task: RPC task
+ *
+ */
+void
+xprt_request_enqueue_receive(struct rpc_task *task)
+{
+ struct rpc_rqst *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = req->rq_xprt;
+
+ spin_lock(&xprt->queue_lock);
+ if (xprt_request_data_received(task) || !list_empty(&req->rq_list)) {
+ spin_unlock(&xprt->queue_lock);
+ return;
+ }
+
+ /* Update the softirq receive buffer */
+ memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
+ sizeof(req->rq_private_buf));
+
+ /* Add request to the receive list */
+ list_add_tail(&req->rq_list, &xprt->recv);
+ set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
+ spin_unlock(&xprt->queue_lock);
+
+ xprt_reset_majortimeo(req);
+ /* Turn off autodisconnect */
+ del_singleshot_timer_sync(&xprt->timer);
+}
+
+/**
+ * xprt_request_dequeue_receive_locked - Remove a request from the receive queue
+ * @task: RPC task
+ *
+ * Caller must hold xprt->queue_lock.
+ */
+static void
+xprt_request_dequeue_receive_locked(struct rpc_task *task)
+{
+ clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
+ list_del_init(&task->tk_rqstp->rq_list);
+}
+
/**
* xprt_update_rtt - Update RPC RTT statistics
* @task: RPC request that recently completed
@@ -923,24 +974,16 @@ void xprt_complete_rqst(struct rpc_task *task, int copied)

xprt->stat.recvs++;

- list_del_init(&req->rq_list);
req->rq_private_buf.len = copied;
/* Ensure all writes are done before we update */
/* req->rq_reply_bytes_recvd */
smp_wmb();
req->rq_reply_bytes_recvd = copied;
- clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
+ xprt_request_dequeue_receive_locked(task);
rpc_wake_up_queued_task(&xprt->pending, task);
}
EXPORT_SYMBOL_GPL(xprt_complete_rqst);

-static bool
-xprt_request_data_received(struct rpc_task *task)
-{
- return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
- task->tk_rqstp->rq_reply_bytes_recvd != 0;
-}
-
static void xprt_timer(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
@@ -1014,32 +1057,15 @@ void xprt_transmit(struct rpc_task *task)

dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);

- if (!req->rq_reply_bytes_recvd) {
-
+ if (!req->rq_bytes_sent) {
+ if (xprt_request_data_received(task))
+ return;
/* Verify that our message lies in the RPCSEC_GSS window */
- if (!req->rq_bytes_sent && rpcauth_xmit_need_reencode(task)) {
+ if (rpcauth_xmit_need_reencode(task)) {
task->tk_status = -EBADMSG;
return;
}
-
- if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
- /*
- * Add to the list only if we're expecting a reply
- */
- /* Update the softirq receive buffer */
- memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
- sizeof(req->rq_private_buf));
- /* Add request to the receive list */
- spin_lock(&xprt->queue_lock);
- list_add_tail(&req->rq_list, &xprt->recv);
- set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
- spin_unlock(&xprt->queue_lock);
- xprt_reset_majortimeo(req);
- /* Turn off autodisconnect */
- del_singleshot_timer_sync(&xprt->timer);
- }
- } else if (xprt_request_data_received(task) && !req->rq_bytes_sent)
- return;
+ }

connect_cookie = xprt->connect_cookie;
status = xprt->ops->send_request(task);
@@ -1376,13 +1402,11 @@ void xprt_release(struct rpc_task *task)
else if (task->tk_client)
rpc_count_iostats(task, task->tk_client->cl_metrics);
spin_lock(&xprt->queue_lock);
- if (!list_empty(&req->rq_list)) {
- list_del_init(&req->rq_list);
- if (atomic_read(&req->rq_pin)) {
- spin_unlock(&xprt->queue_lock);
- xprt_wait_on_pinned_rqst(req);
- spin_lock(&xprt->queue_lock);
- }
+ xprt_request_dequeue_receive_locked(task);
+ while (xprt_is_pinned_rqst(req)) {
+ spin_unlock(&xprt->queue_lock);
+ xprt_wait_on_pinned_rqst(req);
+ spin_lock(&xprt->queue_lock);
}
spin_unlock(&xprt->queue_lock);
spin_lock_bh(&xprt->transport_lock);
--
2.17.1

2018-09-03 19:50:55

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 17/27] SUNRPC: Distinguish between the slot allocation list and receive queue

When storing a struct rpc_rqst on the slot allocation list, we currently
use the same field 'rq_list' as we use to store the request on the
receive queue. Since the structure is never on both lists at the same
time, this is OK.
However, for clarity, let's make that a union with different names for
the different lists so that we can more easily distinguish between
the two states.

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/xprt.h | 9 +++++++--
net/sunrpc/backchannel_rqst.c | 2 +-
net/sunrpc/xprt.c | 16 ++++++++--------
net/sunrpc/xprtrdma/backchannel.c | 2 +-
4 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 4fa2af087cff..9cec2d0811f2 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -82,7 +82,11 @@ struct rpc_rqst {
struct page **rq_enc_pages; /* scratch pages for use by
gss privacy code */
void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages */
- struct list_head rq_list;
+
+ union {
+ struct list_head rq_list; /* Slot allocation list */
+ struct list_head rq_recv; /* Receive queue */
+ };

void *rq_buffer; /* Call XDR encode buffer */
size_t rq_callsize;
@@ -249,7 +253,8 @@ struct rpc_xprt {
struct list_head bc_pa_list; /* List of preallocated
* backchannel rpc_rqst's */
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
- struct list_head recv;
+
+ struct list_head recv_queue; /* Receive queue */

struct {
unsigned long bind_count, /* total number of binds */
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 3c15a99b9700..92e9ad30ec2f 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -91,7 +91,7 @@ struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt, gfp_t gfp_flags)
return NULL;

req->rq_xprt = xprt;
- INIT_LIST_HEAD(&req->rq_list);
+ INIT_LIST_HEAD(&req->rq_recv);
INIT_LIST_HEAD(&req->rq_bc_list);

/* Preallocate one XDR receive buffer */
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index aefa23c90a2b..0184a6aed7e3 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -708,7 +708,7 @@ static void
xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
__must_hold(&xprt->transport_lock)
{
- if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
+ if (list_empty(&xprt->recv_queue) && xprt_has_timer(xprt))
mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
}

@@ -718,7 +718,7 @@ xprt_init_autodisconnect(struct timer_list *t)
struct rpc_xprt *xprt = from_timer(xprt, t, timer);

spin_lock(&xprt->transport_lock);
- if (!list_empty(&xprt->recv))
+ if (!list_empty(&xprt->recv_queue))
goto out_abort;
/* Reset xprt->last_used to avoid connect/autodisconnect cycling */
xprt->last_used = jiffies;
@@ -848,7 +848,7 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
{
struct rpc_rqst *entry;

- list_for_each_entry(entry, &xprt->recv, rq_list)
+ list_for_each_entry(entry, &xprt->recv_queue, rq_recv)
if (entry->rq_xid == xid) {
trace_xprt_lookup_rqst(xprt, xid, 0);
entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
@@ -919,7 +919,7 @@ xprt_request_enqueue_receive(struct rpc_task *task)
struct rpc_xprt *xprt = req->rq_xprt;

spin_lock(&xprt->queue_lock);
- if (xprt_request_data_received(task) || !list_empty(&req->rq_list)) {
+ if (xprt_request_data_received(task) || !list_empty(&req->rq_recv)) {
spin_unlock(&xprt->queue_lock);
return;
}
@@ -929,7 +929,7 @@ xprt_request_enqueue_receive(struct rpc_task *task)
sizeof(req->rq_private_buf));

/* Add request to the receive list */
- list_add_tail(&req->rq_list, &xprt->recv);
+ list_add_tail(&req->rq_recv, &xprt->recv_queue);
set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
spin_unlock(&xprt->queue_lock);

@@ -948,7 +948,7 @@ static void
xprt_request_dequeue_receive_locked(struct rpc_task *task)
{
clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
- list_del_init(&task->tk_rqstp->rq_list);
+ list_del_init(&task->tk_rqstp->rq_recv);
}

/**
@@ -1337,7 +1337,7 @@ xprt_request_init(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req = task->tk_rqstp;

- INIT_LIST_HEAD(&req->rq_list);
+ INIT_LIST_HEAD(&req->rq_recv);
req->rq_timeout = task->tk_client->cl_timeout->to_initval;
req->rq_task = task;
req->rq_xprt = xprt;
@@ -1471,7 +1471,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
spin_lock_init(&xprt->queue_lock);

INIT_LIST_HEAD(&xprt->free);
- INIT_LIST_HEAD(&xprt->recv);
+ INIT_LIST_HEAD(&xprt->recv_queue);
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
spin_lock_init(&xprt->bc_pa_lock);
INIT_LIST_HEAD(&xprt->bc_pa_list);
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 90adeff4c06b..40c7c7306a99 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -51,7 +51,7 @@ static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
rqst = &req->rl_slot;

rqst->rq_xprt = xprt;
- INIT_LIST_HEAD(&rqst->rq_list);
+ INIT_LIST_HEAD(&rqst->rq_recv);
INIT_LIST_HEAD(&rqst->rq_bc_list);
__set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
spin_lock_bh(&xprt->bc_pa_lock);
--
2.17.1

2018-09-03 19:51:00

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 22/27] SUNRPC: Simplify xprt_prepare_transmit()

Remove the checks for whether or not we need to transmit, and whether
or not a reply has been received. Those are already handled in
call_transmit() itself.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/xprt.c | 21 +++------------------
1 file changed, 3 insertions(+), 18 deletions(-)

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 5efa9eddf769..95d15d4017f7 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1102,27 +1102,12 @@ bool xprt_prepare_transmit(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
- bool ret = false;

dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);

- spin_lock_bh(&xprt->transport_lock);
- if (!req->rq_bytes_sent) {
- if (req->rq_reply_bytes_recvd) {
- task->tk_status = req->rq_reply_bytes_recvd;
- goto out_unlock;
- }
- if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
- goto out_unlock;
- }
- if (!xprt->ops->reserve_xprt(xprt, task)) {
- task->tk_status = -EAGAIN;
- goto out_unlock;
- }
- ret = true;
-out_unlock:
- spin_unlock_bh(&xprt->transport_lock);
- return ret;
+ if (!xprt_lock_write(xprt, task))
+ return false;
+ return true;
}

void xprt_end_transmit(struct rpc_task *task)
--
2.17.1

2018-09-03 19:51:05

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 27/27] SUNRPC: Convert the xprt->sending queue back to an ordinary wait queue

We no longer need priority semantics on the xprt->sending queue, because
the order in which tasks are sent is now dictated by their position in
the send queue.
Note that the backlog queue remains a priority queue, meaning that
slot resources are still managed in order of task priority.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/xprt.c | 29 ++++++-----------------------
1 file changed, 6 insertions(+), 23 deletions(-)

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 1ce32e555c9b..730e8bbd63a9 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -183,7 +183,6 @@ EXPORT_SYMBOL_GPL(xprt_load_transport);
int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
- int priority;

if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
if (task == xprt->snd_task)
@@ -201,13 +200,7 @@ int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
task->tk_pid, xprt);
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
- if (req == NULL)
- priority = RPC_PRIORITY_LOW;
- else if (!req->rq_ntrans)
- priority = RPC_PRIORITY_NORMAL;
- else
- priority = RPC_PRIORITY_HIGH;
- rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
+ rpc_sleep_on(&xprt->sending, task, NULL);
return 0;
}
EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
@@ -234,7 +227,6 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
- int priority;

if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
if (task == xprt->snd_task)
@@ -257,13 +249,7 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
- if (req == NULL)
- priority = RPC_PRIORITY_LOW;
- else if (!req->rq_ntrans)
- priority = RPC_PRIORITY_NORMAL;
- else
- priority = RPC_PRIORITY_HIGH;
- rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
+ rpc_sleep_on(&xprt->sending, task, NULL);
return 0;
}
EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
@@ -295,8 +281,7 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return;

- if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
- __xprt_lock_write_func, xprt))
+ if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt))
return;
xprt_clear_locked(xprt);
}
@@ -325,8 +310,7 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
return;
if (RPCXPRT_CONGESTED(xprt))
goto out_unlock;
- if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
- __xprt_lock_write_cong_func, xprt))
+ if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt))
return;
out_unlock:
xprt_clear_locked(xprt);
@@ -506,8 +490,7 @@ void xprt_write_space(struct rpc_xprt *xprt)
if (xprt->snd_task) {
dprintk("RPC: write space: waking waiting task on "
"xprt %p\n", xprt);
- rpc_wake_up_queued_task_on_wq(xprtiod_workqueue,
- &xprt->pending, xprt->snd_task);
+ rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
}
spin_unlock_bh(&xprt->transport_lock);
}
@@ -1573,7 +1556,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)

rpc_init_wait_queue(&xprt->binding, "xprt_binding");
rpc_init_wait_queue(&xprt->pending, "xprt_pending");
- rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending");
+ rpc_init_wait_queue(&xprt->sending, "xprt_sending");
rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");

xprt_init_xid(xprt);
--
2.17.1

2018-09-03 19:51:04

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH 25/27] SUNRPC: Allow calls to xprt_transmit() to drain the entire transmit queue

Rather than forcing each and every RPC task to grab the socket write
lock in order to send itself, we allow whichever task is holding the
write lock to attempt to drain the entire transmit queue.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/xprt.c | 82 +++++++++++++++++++++++++++++++++++++++--------
1 file changed, 69 insertions(+), 13 deletions(-)

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index b85e2c4fa115..1ce32e555c9b 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1116,15 +1116,20 @@ void xprt_end_transmit(struct rpc_task *task)
}

/**
- * xprt_transmit - send an RPC request on a transport
- * @task: controlling RPC task
+ * xprt_request_transmit - send an RPC request on a transport
+ * @req: pointer to request to transmit
+ * @snd_task: RPC task that owns the transport lock
*
- * We have to copy the iovec because sendmsg fiddles with its contents.
+ * This performs the transmission of a single request.
+ * Note that if the request is not the same as snd_task, then it
+ * does need to be pinned.
+ * Returns '0' on success.
*/
-void xprt_transmit(struct rpc_task *task)
+static int
+xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
{
- struct rpc_rqst *req = task->tk_rqstp;
- struct rpc_xprt *xprt = req->rq_xprt;
+ struct rpc_xprt *xprt = req->rq_xprt;
+ struct rpc_task *task = req->rq_task;
unsigned int connect_cookie;
int is_retrans = RPC_WAS_SENT(task);
int status;
@@ -1132,22 +1137,25 @@ void xprt_transmit(struct rpc_task *task)
dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);

if (!req->rq_bytes_sent) {
- if (xprt_request_data_received(task))
+ if (xprt_request_data_received(task)) {
+ status = 0;
goto out_dequeue;
+ }
/* Verify that our message lies in the RPCSEC_GSS window */
if (rpcauth_xmit_need_reencode(task)) {
- task->tk_status = -EBADMSG;
+ status = -EBADMSG;
goto out_dequeue;
}
}

connect_cookie = xprt->connect_cookie;
- status = xprt->ops->send_request(req, task);
+ status = xprt->ops->send_request(req, snd_task);
trace_xprt_transmit(xprt, req->rq_xid, status);
- if (status != 0) {
- task->tk_status = status;
- return;
- }
+ if (status != 0)
+ return status;
+
+ if (is_retrans)
+ task->tk_client->cl_stats->rpcretrans++;

if (is_retrans)
task->tk_client->cl_stats->rpcretrans++;
@@ -1168,6 +1176,54 @@ void xprt_transmit(struct rpc_task *task)
req->rq_connect_cookie = connect_cookie;
out_dequeue:
xprt_request_dequeue_transmit(task);
+ rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
+ return status;
+}
+
+/**
+ * xprt_transmit - send an RPC request on a transport
+ * @task: controlling RPC task
+ *
+ * Attempts to drain the transmit queue. On exit, either the transport
+ * signalled an error that needs to be handled before transmission can
+ * resume, or @task finished transmitting, and detected that it already
+ * received a reply.
+ */
+void
+xprt_transmit(struct rpc_task *task)
+{
+ struct rpc_rqst *next, *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = req->rq_xprt;
+ LIST_HEAD(head);
+ int status;
+
+ task->tk_status = -EAGAIN;
+ spin_lock(&xprt->queue_lock);
+ /* Avoid livelock by moving the xmit_queue contents to a private list */
+ list_splice_init(&xprt->xmit_queue, &head);
+ while (!list_empty(&head)) {
+ next = list_first_entry(&head, struct rpc_rqst, rq_xmit);
+ xprt_pin_rqst(next);
+ spin_unlock(&xprt->queue_lock);
+ status = xprt_request_transmit(next, task);
+ if (status == -EBADMSG && next != req)
+ status = 0;
+ cond_resched();
+ spin_lock(&xprt->queue_lock);
+ xprt_unpin_rqst(next);
+ if (status == 0) {
+ if (!xprt_request_data_received(task) ||
+ test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+ continue;
+ } else if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+ rpc_wake_up_queued_task(&xprt->pending, task);
+ else
+ task->tk_status = status;
+ /* On early exit, splice back the list contents */
+ list_splice(&head, &xprt->xmit_queue);
+ break;
+ }
+ spin_unlock(&xprt->queue_lock);
}

static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
--
2.17.1

2018-09-03 21:32:13

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message



> On Sep 3, 2018, at 11:29 AM, Trond Myklebust <[email protected]> wrote:
>
> Currently, we grab the socket bit lock before we allow the message
> to be XDR encoded. That significantly slows down the transmission
> rate, since we serialise on a potentially blocking operation.

Which operation blocks, and how often?


> Signed-off-by: Trond Myklebust <[email protected]>
> ---
> net/sunrpc/clnt.c | 8 ++++----
> 1 file changed, 4 insertions(+), 4 deletions(-)
>
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index e5ac35e803ad..66ec61347716 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -1949,9 +1949,6 @@ call_transmit(struct rpc_task *task)
> task->tk_action = call_status;
> if (task->tk_status < 0)
> return;
> - if (!xprt_prepare_transmit(task))
> - return;
> - task->tk_action = call_transmit_status;
> /* Encode here so that rpcsec_gss can use correct sequence number. */
> if (rpc_task_need_encode(task)) {
> rpc_xdr_encode(task);
> @@ -1965,8 +1962,11 @@ call_transmit(struct rpc_task *task)
> return;
> }
> }
> + if (!xprt_prepare_transmit(task))
> + return;
> + task->tk_action = call_transmit_status;
> xprt_transmit(task);
> - if (task->tk_status < 0)
> + if (task->tk_status < 0) {

The added curly bracket seems incorrect.


> return;
> if (is_retrans)
> task->tk_client->cl_stats->rpcretrans++;
> --
> 2.17.1
>

--
Chuck Lever
[email protected]

2018-09-03 21:36:16

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH 04/27] SUNRPC: Simplify identification of when the message send/receive is complete



> On Sep 3, 2018, at 11:29 AM, Trond Myklebust <[email protected]> =
wrote:
>=20
> Add states to indicate that the message send and receive are not yet
> complete.

In general, the context provided by the cover letter is lost
since it does not appear in the git commit log. Some of the
patch descriptions in this series, like this one, fail to
explain "why", so there's no guidance at all to folks looking
through these patches after they're merged.

So I'm not sure if this is a clean up patch or a pre-requisite
and why it might be necessary.


> Signed-off-by: Trond Myklebust <[email protected]>
> ---
> include/linux/sunrpc/sched.h | 6 ++++--
> net/sunrpc/clnt.c | 19 +++++++------------
> net/sunrpc/xprt.c | 17 ++++++++++++++---
> 3 files changed, 25 insertions(+), 17 deletions(-)
>=20
> diff --git a/include/linux/sunrpc/sched.h =
b/include/linux/sunrpc/sched.h
> index 592653becd91..9e655df70131 100644
> --- a/include/linux/sunrpc/sched.h
> +++ b/include/linux/sunrpc/sched.h
> @@ -140,8 +140,10 @@ struct rpc_task_setup {
> #define RPC_TASK_RUNNING 0
> #define RPC_TASK_QUEUED 1
> #define RPC_TASK_ACTIVE 2
> -#define RPC_TASK_MSG_RECV 3
> -#define RPC_TASK_MSG_RECV_WAIT 4
> +#define RPC_TASK_NEED_XMIT 3
> +#define RPC_TASK_NEED_RECV 4
> +#define RPC_TASK_MSG_RECV 5
> +#define RPC_TASK_MSG_RECV_WAIT 6
>=20
> #define RPC_IS_RUNNING(t) test_bit(RPC_TASK_RUNNING, =
&(t)->tk_runstate)
> #define rpc_set_running(t) set_bit(RPC_TASK_RUNNING, =
&(t)->tk_runstate)
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index d41b5ac1d4e8..e5ac35e803ad 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -1156,6 +1156,7 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst =
*req)
> */
> xbufp->len =3D xbufp->head[0].iov_len + xbufp->page_len +
> xbufp->tail[0].iov_len;
> + set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
>=20
> task->tk_action =3D call_bc_transmit;
> atomic_inc(&task->tk_count);
> @@ -1720,17 +1721,10 @@ call_allocate(struct rpc_task *task)
> rpc_exit(task, -ERESTARTSYS);
> }
>=20
> -static inline int
> +static int

Nit: perhaps this function should return a bool type.


> rpc_task_need_encode(struct rpc_task *task)
> {
> - return task->tk_rqstp->rq_snd_buf.len =3D=3D 0;
> -}
> -
> -static inline void
> -rpc_task_force_reencode(struct rpc_task *task)
> -{
> - task->tk_rqstp->rq_snd_buf.len =3D 0;
> - task->tk_rqstp->rq_bytes_sent =3D 0;
> + return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) =3D=3D =
0;
> }
>=20
> /*
> @@ -1765,6 +1759,8 @@ rpc_xdr_encode(struct rpc_task *task)
>=20
> task->tk_status =3D rpcauth_wrap_req(task, encode, req, p,
> task->tk_msg.rpc_argp);
> + if (task->tk_status =3D=3D 0)
> + set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
> }
>=20
> /*
> @@ -1999,7 +1995,6 @@ call_transmit_status(struct rpc_task *task)
> */
> if (task->tk_status =3D=3D 0) {
> xprt_end_transmit(task);
> - rpc_task_force_reencode(task);
> return;
> }
>=20
> @@ -2010,7 +2005,6 @@ call_transmit_status(struct rpc_task *task)
> default:
> dprint_status(task);
> xprt_end_transmit(task);
> - rpc_task_force_reencode(task);
> break;
> /*
> * Special cases: if we've been waiting on the
> @@ -2038,7 +2032,7 @@ call_transmit_status(struct rpc_task *task)
> case -EADDRINUSE:
> case -ENOTCONN:
> case -EPIPE:
> - rpc_task_force_reencode(task);
> + break;
> }
> }
>=20
> @@ -2185,6 +2179,7 @@ call_status(struct rpc_task *task)
> rpc_exit(task, status);
> break;
> case -EBADMSG:
> + clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
> task->tk_action =3D call_transmit;
> break;
> default:
> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> index 3973e10ea2bd..45d580cd93ac 100644
> --- a/net/sunrpc/xprt.c
> +++ b/net/sunrpc/xprt.c
> @@ -936,10 +936,18 @@ void xprt_complete_rqst(struct rpc_task *task, =
int copied)
> /* req->rq_reply_bytes_recvd */
> smp_wmb();
> req->rq_reply_bytes_recvd =3D copied;
> + clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
> rpc_wake_up_queued_task(&xprt->pending, task);
> }
> EXPORT_SYMBOL_GPL(xprt_complete_rqst);
>=20
> +static bool
> +xprt_request_data_received(struct rpc_task *task)
> +{
> + return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
> + task->tk_rqstp->rq_reply_bytes_recvd !=3D 0;
> +}
> +
> static void xprt_timer(struct rpc_task *task)
> {
> struct rpc_rqst *req =3D task->tk_rqstp;
> @@ -1031,12 +1039,13 @@ void xprt_transmit(struct rpc_task *task)
> /* Add request to the receive list */
> spin_lock(&xprt->recv_lock);
> list_add_tail(&req->rq_list, &xprt->recv);
> + set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
> spin_unlock(&xprt->recv_lock);
> xprt_reset_majortimeo(req);
> /* Turn off autodisconnect */
> del_singleshot_timer_sync(&xprt->timer);
> }
> - } else if (!req->rq_bytes_sent)
> + } else if (xprt_request_data_received(task) && =
!req->rq_bytes_sent)
> return;
>=20
> connect_cookie =3D xprt->connect_cookie;
> @@ -1046,9 +1055,11 @@ void xprt_transmit(struct rpc_task *task)
> task->tk_status =3D status;
> return;
> }
> +
> xprt_inject_disconnect(xprt);
>=20
> dprintk("RPC: %5u xmit complete\n", task->tk_pid);
> + clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
> task->tk_flags |=3D RPC_TASK_SENT;
> spin_lock_bh(&xprt->transport_lock);
>=20
> @@ -1062,14 +1073,14 @@ void xprt_transmit(struct rpc_task *task)
> spin_unlock_bh(&xprt->transport_lock);
>=20
> req->rq_connect_cookie =3D connect_cookie;
> - if (rpc_reply_expected(task) && =
!READ_ONCE(req->rq_reply_bytes_recvd)) {
> + if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
> /*
> * Sleep on the pending queue if we're expecting a =
reply.
> * The spinlock ensures atomicity between the test of
> * req->rq_reply_bytes_recvd, and the call to =
rpc_sleep_on().
> */
> spin_lock(&xprt->recv_lock);
> - if (!req->rq_reply_bytes_recvd) {
> + if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
> rpc_sleep_on(&xprt->pending, task, xprt_timer);
> /*
> * Send an extra queue wakeup call if the
> --=20
> 2.17.1
>=20

--
Chuck Lever
[email protected]

2018-09-03 21:49:32

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request()



> On Sep 3, 2018, at 11:29 AM, Trond Myklebust <[email protected]> =
wrote:
>=20
> When we shift to using the transmit queue, then the task that holds =
the
> write lock will not necessarily be the same as the one being =
transmitted.

Can you pass in just the rpc_rqst? Then @task would be available as
xprt->snd_task.


> Signed-off-by: Trond Myklebust <[email protected]>
> ---
> include/linux/sunrpc/xprt.h | 2 +-
> net/sunrpc/xprt.c | 2 +-
> net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 3 +--
> net/sunrpc/xprtrdma/transport.c | 5 ++--
> net/sunrpc/xprtsock.c | 27 +++++++++++-----------
> 5 files changed, 18 insertions(+), 21 deletions(-)
>=20
> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
> index 81a6c2c8dfc7..6d91acfe0644 100644
> --- a/include/linux/sunrpc/xprt.h
> +++ b/include/linux/sunrpc/xprt.h
> @@ -140,7 +140,7 @@ struct rpc_xprt_ops {
> void (*connect)(struct rpc_xprt *xprt, struct =
rpc_task *task);
> int (*buf_alloc)(struct rpc_task *task);
> void (*buf_free)(struct rpc_task *task);
> - int (*send_request)(struct rpc_task *task);
> + int (*send_request)(struct rpc_rqst *req, struct =
rpc_task *task);
> void (*set_retrans_timeout)(struct rpc_task *task);
> void (*timer)(struct rpc_xprt *xprt, struct rpc_task =
*task);
> void (*release_request)(struct rpc_task *task);
> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> index defc6167570a..3efcb69af526 100644
> --- a/net/sunrpc/xprt.c
> +++ b/net/sunrpc/xprt.c
> @@ -1170,7 +1170,7 @@ void xprt_transmit(struct rpc_task *task)
> }
>=20
> connect_cookie =3D xprt->connect_cookie;
> - status =3D xprt->ops->send_request(task);
> + status =3D xprt->ops->send_request(req, task);
> trace_xprt_transmit(xprt, req->rq_xid, status);
> if (status !=3D 0) {
> task->tk_status =3D status;
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c =
b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> index 09b12b7568fe..d1618c70edb4 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> @@ -215,9 +215,8 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, =
struct rpc_rqst *rqst)
> * connection.
> */
> static int
> -xprt_rdma_bc_send_request(struct rpc_task *task)
> +xprt_rdma_bc_send_request(struct rpc_rqst *rqst, struct rpc_task =
*task)
> {
> - struct rpc_rqst *rqst =3D task->tk_rqstp;
> struct svc_xprt *sxprt =3D rqst->rq_xprt->bc_xprt;
> struct svcxprt_rdma *rdma;
> int ret;
> diff --git a/net/sunrpc/xprtrdma/transport.c =
b/net/sunrpc/xprtrdma/transport.c
> index 143ce2579ba9..fa684bf4d090 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -706,9 +706,8 @@ xprt_rdma_free(struct rpc_task *task)
> * sent. Do not try to send this message again.
> */
> static int
> -xprt_rdma_send_request(struct rpc_task *task)
> +xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task *task)

Nit: The function's documenting comment will need to be updated.


> {
> - struct rpc_rqst *rqst =3D task->tk_rqstp;
> struct rpc_xprt *xprt =3D rqst->rq_xprt;
> struct rpcrdma_req *req =3D rpcr_to_rdmar(rqst);
> struct rpcrdma_xprt *r_xprt =3D rpcx_to_rdmax(xprt);
> @@ -741,7 +740,7 @@ xprt_rdma_send_request(struct rpc_task *task)
> /* An RPC with no reply will throw off credit accounting,
> * so drop the connection to reset the credit grant.
> */
> - if (!rpc_reply_expected(task))
> + if (!rpc_reply_expected(rqst->rq_task))
> goto drop_connection;
> return 0;
>=20
> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> index 8d6404259ff9..b8143eded4af 100644
> --- a/net/sunrpc/xprtsock.c
> +++ b/net/sunrpc/xprtsock.c
> @@ -449,12 +449,12 @@ static void xs_nospace_callback(struct rpc_task =
*task)
>=20
> /**
> * xs_nospace - place task on wait queue if transmit was incomplete
> + * @req: pointer to RPC request
> * @task: task to put to sleep
> *
> */
> -static int xs_nospace(struct rpc_task *task)
> +static int xs_nospace(struct rpc_rqst *req, struct rpc_task *task)
> {
> - struct rpc_rqst *req =3D task->tk_rqstp;
> struct rpc_xprt *xprt =3D req->rq_xprt;
> struct sock_xprt *transport =3D container_of(xprt, struct =
sock_xprt, xprt);
> struct sock *sk =3D transport->inet;
> @@ -513,6 +513,7 @@ static inline void =
xs_encode_stream_record_marker(struct xdr_buf *buf)
>=20
> /**
> * xs_local_send_request - write an RPC request to an AF_LOCAL socket
> + * @req: pointer to RPC request
> * @task: RPC task that manages the state of an RPC request
> *
> * Return values:
> @@ -522,9 +523,8 @@ static inline void =
xs_encode_stream_record_marker(struct xdr_buf *buf)
> * ENOTCONN: Caller needs to invoke connect logic then call again
> * other: Some other error occured, the request was not sent
> */
> -static int xs_local_send_request(struct rpc_task *task)
> +static int xs_local_send_request(struct rpc_rqst *req, struct =
rpc_task *task)
> {
> - struct rpc_rqst *req =3D task->tk_rqstp;
> struct rpc_xprt *xprt =3D req->rq_xprt;
> struct sock_xprt *transport =3D
> container_of(xprt, struct sock_xprt, =
xprt);
> @@ -569,7 +569,7 @@ static int xs_local_send_request(struct rpc_task =
*task)
> case -ENOBUFS:
> break;
> case -EAGAIN:
> - status =3D xs_nospace(task);
> + status =3D xs_nospace(req, task);
> break;
> default:
> dprintk("RPC: sendmsg returned unrecognized error =
%d\n",
> @@ -585,6 +585,7 @@ static int xs_local_send_request(struct rpc_task =
*task)
>=20
> /**
> * xs_udp_send_request - write an RPC request to a UDP socket
> + * @req: pointer to RPC request
> * @task: address of RPC task that manages the state of an RPC request
> *
> * Return values:
> @@ -594,9 +595,8 @@ static int xs_local_send_request(struct rpc_task =
*task)
> * ENOTCONN: Caller needs to invoke connect logic then call again
> * other: Some other error occurred, the request was not sent
> */
> -static int xs_udp_send_request(struct rpc_task *task)
> +static int xs_udp_send_request(struct rpc_rqst *req, struct rpc_task =
*task)
> {
> - struct rpc_rqst *req =3D task->tk_rqstp;
> struct rpc_xprt *xprt =3D req->rq_xprt;
> struct sock_xprt *transport =3D container_of(xprt, struct =
sock_xprt, xprt);
> struct xdr_buf *xdr =3D &req->rq_snd_buf;
> @@ -638,7 +638,7 @@ static int xs_udp_send_request(struct rpc_task =
*task)
> /* Should we call xs_close() here? */
> break;
> case -EAGAIN:
> - status =3D xs_nospace(task);
> + status =3D xs_nospace(req, task);
> break;
> case -ENETUNREACH:
> case -ENOBUFS:
> @@ -658,6 +658,7 @@ static int xs_udp_send_request(struct rpc_task =
*task)
>=20
> /**
> * xs_tcp_send_request - write an RPC request to a TCP socket
> + * @req: pointer to RPC request
> * @task: address of RPC task that manages the state of an RPC request
> *
> * Return values:
> @@ -670,9 +671,8 @@ static int xs_udp_send_request(struct rpc_task =
*task)
> * XXX: In the case of soft timeouts, should we eventually give up
> * if sendmsg is not able to make progress?
> */
> -static int xs_tcp_send_request(struct rpc_task *task)
> +static int xs_tcp_send_request(struct rpc_rqst *req, struct rpc_task =
*task)
> {
> - struct rpc_rqst *req =3D task->tk_rqstp;
> struct rpc_xprt *xprt =3D req->rq_xprt;
> struct sock_xprt *transport =3D container_of(xprt, struct =
sock_xprt, xprt);
> struct xdr_buf *xdr =3D &req->rq_snd_buf;
> @@ -697,7 +697,7 @@ static int xs_tcp_send_request(struct rpc_task =
*task)
> * completes while the socket holds a reference to the pages,
> * then we may end up resending corrupted data.
> */
> - if (task->tk_flags & RPC_TASK_SENT)
> + if (req->rq_task->tk_flags & RPC_TASK_SENT)
> zerocopy =3D false;
>=20
> if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
> @@ -761,7 +761,7 @@ static int xs_tcp_send_request(struct rpc_task =
*task)
> /* Should we call xs_close() here? */
> break;
> case -EAGAIN:
> - status =3D xs_nospace(task);
> + status =3D xs_nospace(req, task);
> break;
> case -ECONNRESET:
> case -ECONNREFUSED:
> @@ -2706,9 +2706,8 @@ static int bc_sendto(struct rpc_rqst *req)
> /*
> * The send routine. Borrows from svc_send
> */
> -static int bc_send_request(struct rpc_task *task)
> +static int bc_send_request(struct rpc_rqst *req, struct rpc_task =
*task)
> {
> - struct rpc_rqst *req =3D task->tk_rqstp;
> struct svc_xprt *xprt;
> int len;
>=20
> --=20
> 2.17.1
>=20

--
Chuck Lever
[email protected]

2018-09-03 21:52:13

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit()

Do you also need to move req->rq_ntrans++ ?


> On Sep 3, 2018, at 11:29 AM, Trond Myklebust <[email protected]> wrote:
>
> Signed-off-by: Trond Myklebust <[email protected]>
> ---
> net/sunrpc/clnt.c | 6 ------
> net/sunrpc/xprt.c | 4 ++++
> 2 files changed, 4 insertions(+), 6 deletions(-)
>
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index 585a82dfaf4d..fb40d1e9f636 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -1967,8 +1967,6 @@ call_connect_status(struct rpc_task *task)
> static void
> call_transmit(struct rpc_task *task)
> {
> - int is_retrans = RPC_WAS_SENT(task);
> -
> dprint_status(task);
>
> task->tk_action = call_transmit_status;
> @@ -1979,10 +1977,6 @@ call_transmit(struct rpc_task *task)
> if (!xprt_prepare_transmit(task))
> return;
> xprt_transmit(task);
> - if (task->tk_status < 0)
> - return;
> - if (is_retrans)
> - task->tk_client->cl_stats->rpcretrans++;
> }
>
> /*
> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> index 95d15d4017f7..b85e2c4fa115 100644
> --- a/net/sunrpc/xprt.c
> +++ b/net/sunrpc/xprt.c
> @@ -1126,6 +1126,7 @@ void xprt_transmit(struct rpc_task *task)
> struct rpc_rqst *req = task->tk_rqstp;
> struct rpc_xprt *xprt = req->rq_xprt;
> unsigned int connect_cookie;
> + int is_retrans = RPC_WAS_SENT(task);
> int status;
>
> dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
> @@ -1148,6 +1149,9 @@ void xprt_transmit(struct rpc_task *task)
> return;
> }
>
> + if (is_retrans)
> + task->tk_client->cl_stats->rpcretrans++;
> +
> xprt_inject_disconnect(xprt);
>
> dprintk("RPC: %5u xmit complete\n", task->tk_pid);
> --
> 2.17.1
>

--
Chuck Lever
[email protected]

2018-09-03 22:01:59

by Trond Myklebust

[permalink] [raw]
Subject: Re: [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message

On Mon, 2018-09-03 at 13:11 -0400, Chuck Lever wrote:
> > On Sep 3, 2018, at 11:29 AM, Trond Myklebust <[email protected]>
> > wrote:
> >
> > Currently, we grab the socket bit lock before we allow the message
> > to be XDR encoded. That significantly slows down the transmission
> > rate, since we serialise on a potentially blocking operation.
>
> Which operation blocks, and how often?

RPCSEC_GSS allocates memory when doing privacy encoding, for instance.

> > Signed-off-by: Trond Myklebust <[email protected]>
> > ---
> > net/sunrpc/clnt.c | 8 ++++----
> > 1 file changed, 4 insertions(+), 4 deletions(-)
> >
> > diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> > index e5ac35e803ad..66ec61347716 100644
> > --- a/net/sunrpc/clnt.c
> > +++ b/net/sunrpc/clnt.c
> > @@ -1949,9 +1949,6 @@ call_transmit(struct rpc_task *task)
> > task->tk_action = call_status;
> > if (task->tk_status < 0)
> > return;
> > - if (!xprt_prepare_transmit(task))
> > - return;
> > - task->tk_action = call_transmit_status;
> > /* Encode here so that rpcsec_gss can use correct sequence
> > number. */
> > if (rpc_task_need_encode(task)) {
> > rpc_xdr_encode(task);
> > @@ -1965,8 +1962,11 @@ call_transmit(struct rpc_task *task)
> > return;
> > }
> > }
> > + if (!xprt_prepare_transmit(task))
> > + return;
> > + task->tk_action = call_transmit_status;
> > xprt_transmit(task);
> > - if (task->tk_status < 0)
> > + if (task->tk_status < 0) {
>
> The added curly bracket seems incorrect.

Oops. Yes, that's a rebase brain-fart... Thanks for spotting it!
>
>
> > return;
> > if (is_retrans)
> > task->tk_client->cl_stats->rpcretrans++;
> > --
> > 2.17.1
> >
>
> --
> Chuck Lever
> [email protected]
>
>
>

2018-09-03 22:02:36

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH 00/27] Convert RPC client transmission to a queued model



> On Sep 3, 2018, at 11:29 AM, Trond Myklebust <[email protected]> wrote:
>
> For historical reasons, the RPC client is heavily serialised during the
> process of transmitting a request by the XPRT_LOCK. A request is
> required to take that lock before it can start XDR encoding, and it is
> required to hold it until it is done transmitting. In essence the lock
> protects the following functions:
>
> - Stream based transport connect/reconnect
> - RPCSEC_GSS encoding of the RPC message
> - Transmission of a single RPC message

It also protects TCP rqst slot allocation:

void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
{
/* Note: grabbing the xprt_lock_write() ensures that we throttle
* new slot allocation if the transport is congested (i.e. when
* reconnecting a stream transport or when out of socket write
* buffer space).
*/
if (xprt_lock_write(xprt, task)) {
xprt_alloc_slot(xprt, task);
xprt_release_write(xprt, task);
}
}


> The following patch set assumes that we do not need to do much to
> improve performance of the connect/reconnect case, as that is supposed
> to be a rare occurrence.
>
> The set looks at dealing with RPCSEC_GSS issues by removing serialisation
> while encoding, and simply assuming that if we detect after grabbing the
> XPRT_LOCK that we're about to transmit a message with a sequence number
> that has fallen outside the window allowed by RFC2203, then we can
> abort the transmission of that message, and schedule it for re-encoding.
> Since window sizes are typically expected to lie above 100 messages or
> so, we expect these cases where we miss the window to be rare, in
> general.
>
> Finally, we look at trying to avoid the requirement that every request
> must go through the process of being woken up to grab the XPRT_LOCK in
> order to transmit itself by allowing a request that currently holds the
> XPRT_LOCK to grab other requests from an ordered queue, and to transmit
> them too. The bulk of the changes in this patchset are dedicated to
> providing this functionality.

When considering whether this kind of change could work for
xprtrdma: the transport send lock mechanism is used to manage
the credit grant. The transport send lock prevents the client
from sending too many RPC Calls at once.

Congestion- or flow-controlled transports might not be able to
adopt this approach, because there needs to be a check before
each RPC Call is sent to see if the congestion/credit window
has room.


> Trond Myklebust (27):
> SUNRPC: Clean up initialisation of the struct rpc_rqst
> SUNRPC: If there is no reply expected, bail early from call_decode
> SUNRPC: The transmitted message must lie in the RPCSEC window of
> validity
> SUNRPC: Simplify identification of when the message send/receive is
> complete
> SUNRPC: Avoid holding locks across the XDR encoding of the RPC message
> SUNRPC: Rename TCP receive-specific state variables
> SUNRPC: Move reset of TCP state variables into the reconnect code
> SUNRPC: Add socket transmit queue offset tracking
> SUNRPC: Simplify dealing with aborted partially transmitted messages
> SUNRPC: Refactor the transport request pinning
> SUNRPC: Add a helper to wake up a sleeping rpc_task and set its status
> SUNRPC: Don't wake queued RPC calls multiple times in xprt_transmit
> SUNRPC: Rename xprt->recv_lock to xprt->queue_lock
> SUNRPC: Refactor xprt_transmit() to remove the reply queue code
> SUNRPC: Refactor xprt_transmit() to remove wait for reply code
> SUNRPC: Minor cleanup for call_transmit()
> SUNRPC: Distinguish between the slot allocation list and receive queue
> NFS: Add a transmission queue for RPC requests
> SUNRPC: Refactor RPC call encoding
> SUNRPC: Treat the task and request as separate in the
> xprt_ops->send_request()
> SUNRPC: Don't reset the request 'bytes_sent' counter when releasing
> XPRT_LOCK
> SUNRPC: Simplify xprt_prepare_transmit()
> SUNRPC: Move RPC retransmission stat counter to xprt_transmit()
> SUNRPC: Fix up the back channel transmit
> SUNRPC: Allow calls to xprt_transmit() to drain the entire transmit
> queue
> SUNRPC: Queue the request for transmission immediately after encoding
> SUNRPC: Convert the xprt->sending queue back to an ordinary wait queue
>
> include/linux/sunrpc/auth.h | 2 +
> include/linux/sunrpc/auth_gss.h | 1 +
> include/linux/sunrpc/sched.h | 7 +-
> include/linux/sunrpc/xprt.h | 23 +-
> include/linux/sunrpc/xprtsock.h | 23 +-
> include/trace/events/sunrpc.h | 10 +-
> net/sunrpc/auth.c | 10 +
> net/sunrpc/auth_gss/auth_gss.c | 41 ++
> net/sunrpc/backchannel_rqst.c | 3 +-
> net/sunrpc/clnt.c | 139 +++---
> net/sunrpc/sched.c | 63 ++-
> net/sunrpc/svcsock.c | 6 +-
> net/sunrpc/xprt.c | 503 +++++++++++++--------
> net/sunrpc/xprtrdma/backchannel.c | 3 +-
> net/sunrpc/xprtrdma/rpc_rdma.c | 10 +-
> net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 7 +-
> net/sunrpc/xprtrdma/transport.c | 5 +-
> net/sunrpc/xprtsock.c | 327 +++++++-------
> 18 files changed, 728 insertions(+), 455 deletions(-)
>
> --
> 2.17.1
>

--
Chuck Lever

2018-09-03 22:09:11

by Trond Myklebust

[permalink] [raw]
Subject: Re: [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request()

On Mon, 2018-09-03 at 13:28 -0400, Chuck Lever wrote:
> > On Sep 3, 2018, at 11:29 AM, Trond Myklebust <[email protected]>
> > wrote:
> >
> > When we shift to using the transmit queue, then the task that holds
> > the
> > write lock will not necessarily be the same as the one being
> > transmitted.
>
> Can you pass in just the rpc_rqst? Then @task would be available as
> xprt->snd_task.

No. The point is that when we add queueing, @req->rq_task will usually
be asleep on xprt->sending. Only the task that holds the XPRT_LOCK is
awake, and so it is the one that needs to be put asleep when we hit a
socket nospace/EAGAIN error.

We _could_ potentially avoid passing in the task at all, and just have
xs_nospace() put xprt->snd_task to sleep, but that would skirt the edge
of a transport layering violation.

>
> > Signed-off-by: Trond Myklebust <[email protected]>
> > ---
> > include/linux/sunrpc/xprt.h | 2 +-
> > net/sunrpc/xprt.c | 2 +-
> > net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 3 +--
> > net/sunrpc/xprtrdma/transport.c | 5 ++--
> > net/sunrpc/xprtsock.c | 27 +++++++++++------
> > -----
> > 5 files changed, 18 insertions(+), 21 deletions(-)
> >
> > diff --git a/include/linux/sunrpc/xprt.h
> > b/include/linux/sunrpc/xprt.h
> > index 81a6c2c8dfc7..6d91acfe0644 100644
> > --- a/include/linux/sunrpc/xprt.h
> > +++ b/include/linux/sunrpc/xprt.h
> > @@ -140,7 +140,7 @@ struct rpc_xprt_ops {
> > void (*connect)(struct rpc_xprt *xprt, struct
> > rpc_task *task);
> > int (*buf_alloc)(struct rpc_task *task);
> > void (*buf_free)(struct rpc_task *task);
> > - int (*send_request)(struct rpc_task *task);
> > + int (*send_request)(struct rpc_rqst *req, struct
> > rpc_task *task);
> > void (*set_retrans_timeout)(struct rpc_task
> > *task);
> > void (*timer)(struct rpc_xprt *xprt, struct
> > rpc_task *task);
> > void (*release_request)(struct rpc_task *task);
> > diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> > index defc6167570a..3efcb69af526 100644
> > --- a/net/sunrpc/xprt.c
> > +++ b/net/sunrpc/xprt.c
> > @@ -1170,7 +1170,7 @@ void xprt_transmit(struct rpc_task *task)
> > }
> >
> > connect_cookie = xprt->connect_cookie;
> > - status = xprt->ops->send_request(task);
> > + status = xprt->ops->send_request(req, task);
> > trace_xprt_transmit(xprt, req->rq_xid, status);
> > if (status != 0) {
> > task->tk_status = status;
> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> > b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> > index 09b12b7568fe..d1618c70edb4 100644
> > --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> > +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> > @@ -215,9 +215,8 @@ rpcrdma_bc_send_request(struct svcxprt_rdma
> > *rdma, struct rpc_rqst *rqst)
> > * connection.
> > */
> > static int
> > -xprt_rdma_bc_send_request(struct rpc_task *task)
> > +xprt_rdma_bc_send_request(struct rpc_rqst *rqst, struct rpc_task
> > *task)
> > {
> > - struct rpc_rqst *rqst = task->tk_rqstp;
> > struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
> > struct svcxprt_rdma *rdma;
> > int ret;
> > diff --git a/net/sunrpc/xprtrdma/transport.c
> > b/net/sunrpc/xprtrdma/transport.c
> > index 143ce2579ba9..fa684bf4d090 100644
> > --- a/net/sunrpc/xprtrdma/transport.c
> > +++ b/net/sunrpc/xprtrdma/transport.c
> > @@ -706,9 +706,8 @@ xprt_rdma_free(struct rpc_task *task)
> > * sent. Do not try to send this message again.
> > */
> > static int
> > -xprt_rdma_send_request(struct rpc_task *task)
> > +xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task
> > *task)
>
> Nit: The function's documenting comment will need to be updated.
>
>
> > {
> > - struct rpc_rqst *rqst = task->tk_rqstp;
> > struct rpc_xprt *xprt = rqst->rq_xprt;
> > struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
> > struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> > @@ -741,7 +740,7 @@ xprt_rdma_send_request(struct rpc_task *task)
> > /* An RPC with no reply will throw off credit accounting,
> > * so drop the connection to reset the credit grant.
> > */
> > - if (!rpc_reply_expected(task))
> > + if (!rpc_reply_expected(rqst->rq_task))
> > goto drop_connection;
> > return 0;
> >
> > diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> > index 8d6404259ff9..b8143eded4af 100644
> > --- a/net/sunrpc/xprtsock.c
> > +++ b/net/sunrpc/xprtsock.c
> > @@ -449,12 +449,12 @@ static void xs_nospace_callback(struct
> > rpc_task *task)
> >
> > /**
> > * xs_nospace - place task on wait queue if transmit was incomplete
> > + * @req: pointer to RPC request
> > * @task: task to put to sleep
> > *
> > */
> > -static int xs_nospace(struct rpc_task *task)
> > +static int xs_nospace(struct rpc_rqst *req, struct rpc_task *task)
> > {
> > - struct rpc_rqst *req = task->tk_rqstp;
> > struct rpc_xprt *xprt = req->rq_xprt;
> > struct sock_xprt *transport = container_of(xprt, struct
> > sock_xprt, xprt);
> > struct sock *sk = transport->inet;
> > @@ -513,6 +513,7 @@ static inline void
> > xs_encode_stream_record_marker(struct xdr_buf *buf)
> >
> > /**
> > * xs_local_send_request - write an RPC request to an AF_LOCAL
> > socket
> > + * @req: pointer to RPC request
> > * @task: RPC task that manages the state of an RPC request
> > *
> > * Return values:
> > @@ -522,9 +523,8 @@ static inline void
> > xs_encode_stream_record_marker(struct xdr_buf *buf)
> > * ENOTCONN: Caller needs to invoke connect logic then call
> > again
> > * other: Some other error occured, the request was not
> > sent
> > */
> > -static int xs_local_send_request(struct rpc_task *task)
> > +static int xs_local_send_request(struct rpc_rqst *req, struct
> > rpc_task *task)
> > {
> > - struct rpc_rqst *req = task->tk_rqstp;
> > struct rpc_xprt *xprt = req->rq_xprt;
> > struct sock_xprt *transport =
> > container_of(xprt, struct sock_xprt,
> > xprt);
> > @@ -569,7 +569,7 @@ static int xs_local_send_request(struct
> > rpc_task *task)
> > case -ENOBUFS:
> > break;
> > case -EAGAIN:
> > - status = xs_nospace(task);
> > + status = xs_nospace(req, task);
> > break;
> > default:
> > dprintk("RPC: sendmsg returned unrecognized error
> > %d\n",
> > @@ -585,6 +585,7 @@ static int xs_local_send_request(struct
> > rpc_task *task)
> >
> > /**
> > * xs_udp_send_request - write an RPC request to a UDP socket
> > + * @req: pointer to RPC request
> > * @task: address of RPC task that manages the state of an RPC
> > request
> > *
> > * Return values:
> > @@ -594,9 +595,8 @@ static int xs_local_send_request(struct
> > rpc_task *task)
> > * ENOTCONN: Caller needs to invoke connect logic then call
> > again
> > * other: Some other error occurred, the request was not
> > sent
> > */
> > -static int xs_udp_send_request(struct rpc_task *task)
> > +static int xs_udp_send_request(struct rpc_rqst *req, struct
> > rpc_task *task)
> > {
> > - struct rpc_rqst *req = task->tk_rqstp;
> > struct rpc_xprt *xprt = req->rq_xprt;
> > struct sock_xprt *transport = container_of(xprt, struct
> > sock_xprt, xprt);
> > struct xdr_buf *xdr = &req->rq_snd_buf;
> > @@ -638,7 +638,7 @@ static int xs_udp_send_request(struct rpc_task
> > *task)
> > /* Should we call xs_close() here? */
> > break;
> > case -EAGAIN:
> > - status = xs_nospace(task);
> > + status = xs_nospace(req, task);
> > break;
> > case -ENETUNREACH:
> > case -ENOBUFS:
> > @@ -658,6 +658,7 @@ static int xs_udp_send_request(struct rpc_task
> > *task)
> >
> > /**
> > * xs_tcp_send_request - write an RPC request to a TCP socket
> > + * @req: pointer to RPC request
> > * @task: address of RPC task that manages the state of an RPC
> > request
> > *
> > * Return values:
> > @@ -670,9 +671,8 @@ static int xs_udp_send_request(struct rpc_task
> > *task)
> > * XXX: In the case of soft timeouts, should we eventually give up
> > * if sendmsg is not able to make progress?
> > */
> > -static int xs_tcp_send_request(struct rpc_task *task)
> > +static int xs_tcp_send_request(struct rpc_rqst *req, struct
> > rpc_task *task)
> > {
> > - struct rpc_rqst *req = task->tk_rqstp;
> > struct rpc_xprt *xprt = req->rq_xprt;
> > struct sock_xprt *transport = container_of(xprt, struct
> > sock_xprt, xprt);
> > struct xdr_buf *xdr = &req->rq_snd_buf;
> > @@ -697,7 +697,7 @@ static int xs_tcp_send_request(struct rpc_task
> > *task)
> > * completes while the socket holds a reference to the pages,
> > * then we may end up resending corrupted data.
> > */
> > - if (task->tk_flags & RPC_TASK_SENT)
> > + if (req->rq_task->tk_flags & RPC_TASK_SENT)
> > zerocopy = false;
> >
> > if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
> > @@ -761,7 +761,7 @@ static int xs_tcp_send_request(struct rpc_task
> > *task)
> > /* Should we call xs_close() here? */
> > break;
> > case -EAGAIN:
> > - status = xs_nospace(task);
> > + status = xs_nospace(req, task);
> > break;
> > case -ECONNRESET:
> > case -ECONNREFUSED:
> > @@ -2706,9 +2706,8 @@ static int bc_sendto(struct rpc_rqst *req)
> > /*
> > * The send routine. Borrows from svc_send
> > */
> > -static int bc_send_request(struct rpc_task *task)
> > +static int bc_send_request(struct rpc_rqst *req, struct rpc_task
> > *task)
> > {
> > - struct rpc_rqst *req = task->tk_rqstp;
> > struct svc_xprt *xprt;
> > int len;
> >
> > --
> > 2.17.1
> >
>
> --
> Chuck Lever
> [email protected]
>
>
>

2018-09-03 22:11:00

by Trond Myklebust

[permalink] [raw]
Subject: Re: [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit()

On Mon, 2018-09-03 at 13:31 -0400, Chuck Lever wrote:
> Do you also need to move req->rq_ntrans++ ?

Good point. Right now, it is in completely the wrong place.

>
> > On Sep 3, 2018, at 11:29 AM, Trond Myklebust <[email protected]>
> > wrote:
> >
> > Signed-off-by: Trond Myklebust <[email protected]>
> > ---
> > net/sunrpc/clnt.c | 6 ------
> > net/sunrpc/xprt.c | 4 ++++
> > 2 files changed, 4 insertions(+), 6 deletions(-)
> >
> > diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> > index 585a82dfaf4d..fb40d1e9f636 100644
> > --- a/net/sunrpc/clnt.c
> > +++ b/net/sunrpc/clnt.c
> > @@ -1967,8 +1967,6 @@ call_connect_status(struct rpc_task *task)
> > static void
> > call_transmit(struct rpc_task *task)
> > {
> > - int is_retrans = RPC_WAS_SENT(task);
> > -
> > dprint_status(task);
> >
> > task->tk_action = call_transmit_status;
> > @@ -1979,10 +1977,6 @@ call_transmit(struct rpc_task *task)
> > if (!xprt_prepare_transmit(task))
> > return;
> > xprt_transmit(task);
> > - if (task->tk_status < 0)
> > - return;
> > - if (is_retrans)
> > - task->tk_client->cl_stats->rpcretrans++;
> > }
> >
> > /*
> > diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> > index 95d15d4017f7..b85e2c4fa115 100644
> > --- a/net/sunrpc/xprt.c
> > +++ b/net/sunrpc/xprt.c
> > @@ -1126,6 +1126,7 @@ void xprt_transmit(struct rpc_task *task)
> > struct rpc_rqst *req = task->tk_rqstp;
> > struct rpc_xprt *xprt = req->rq_xprt;
> > unsigned int connect_cookie;
> > + int is_retrans = RPC_WAS_SENT(task);
> > int status;
> >
> > dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req-
> > >rq_slen);
> > @@ -1148,6 +1149,9 @@ void xprt_transmit(struct rpc_task *task)
> > return;
> > }
> >
> > + if (is_retrans)
> > + task->tk_client->cl_stats->rpcretrans++;
> > +
> > xprt_inject_disconnect(xprt);
> >
> > dprintk("RPC: %5u xmit complete\n", task->tk_pid);
> > --
> > 2.17.1
> >
>
> --
> Chuck Lever
> [email protected]
>
>
>

2018-09-03 22:11:06

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request()



> On Sep 3, 2018, at 1:47 PM, Trond Myklebust <[email protected]> wrote:
>
> On Mon, 2018-09-03 at 13:28 -0400, Chuck Lever wrote:
>>> On Sep 3, 2018, at 11:29 AM, Trond Myklebust <[email protected]>
>>> wrote:
>>>
>>> When we shift to using the transmit queue, then the task that holds
>>> the
>>> write lock will not necessarily be the same as the one being
>>> transmitted.
>>
>> Can you pass in just the rpc_rqst? Then @task would be available as
>> xprt->snd_task.
>
> No. The point is that when we add queueing, @req->rq_task will usually
> be asleep on xprt->sending. Only the task that holds the XPRT_LOCK is
> awake, and so it is the one that needs to be put asleep when we hit a
> socket nospace/EAGAIN error.

Right, that task is rqst->rq_xprt->snd_task, isn't it?


> We _could_ potentially avoid passing in the task at all, and just have
> xs_nospace() put xprt->snd_task to sleep, but that would skirt the edge
> of a transport layering violation.
>
>>
>>> Signed-off-by: Trond Myklebust <[email protected]>
>>> ---
>>> include/linux/sunrpc/xprt.h | 2 +-
>>> net/sunrpc/xprt.c | 2 +-
>>> net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 3 +--
>>> net/sunrpc/xprtrdma/transport.c | 5 ++--
>>> net/sunrpc/xprtsock.c | 27 +++++++++++------
>>> -----
>>> 5 files changed, 18 insertions(+), 21 deletions(-)
>>>
>>> diff --git a/include/linux/sunrpc/xprt.h
>>> b/include/linux/sunrpc/xprt.h
>>> index 81a6c2c8dfc7..6d91acfe0644 100644
>>> --- a/include/linux/sunrpc/xprt.h
>>> +++ b/include/linux/sunrpc/xprt.h
>>> @@ -140,7 +140,7 @@ struct rpc_xprt_ops {
>>> void (*connect)(struct rpc_xprt *xprt, struct
>>> rpc_task *task);
>>> int (*buf_alloc)(struct rpc_task *task);
>>> void (*buf_free)(struct rpc_task *task);
>>> - int (*send_request)(struct rpc_task *task);
>>> + int (*send_request)(struct rpc_rqst *req, struct
>>> rpc_task *task);
>>> void (*set_retrans_timeout)(struct rpc_task
>>> *task);
>>> void (*timer)(struct rpc_xprt *xprt, struct
>>> rpc_task *task);
>>> void (*release_request)(struct rpc_task *task);
>>> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
>>> index defc6167570a..3efcb69af526 100644
>>> --- a/net/sunrpc/xprt.c
>>> +++ b/net/sunrpc/xprt.c
>>> @@ -1170,7 +1170,7 @@ void xprt_transmit(struct rpc_task *task)
>>> }
>>>
>>> connect_cookie = xprt->connect_cookie;
>>> - status = xprt->ops->send_request(task);
>>> + status = xprt->ops->send_request(req, task);
>>> trace_xprt_transmit(xprt, req->rq_xid, status);
>>> if (status != 0) {
>>> task->tk_status = status;
>>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>>> b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>>> index 09b12b7568fe..d1618c70edb4 100644
>>> --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>>> +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>>> @@ -215,9 +215,8 @@ rpcrdma_bc_send_request(struct svcxprt_rdma
>>> *rdma, struct rpc_rqst *rqst)
>>> * connection.
>>> */
>>> static int
>>> -xprt_rdma_bc_send_request(struct rpc_task *task)
>>> +xprt_rdma_bc_send_request(struct rpc_rqst *rqst, struct rpc_task
>>> *task)
>>> {
>>> - struct rpc_rqst *rqst = task->tk_rqstp;
>>> struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
>>> struct svcxprt_rdma *rdma;
>>> int ret;
>>> diff --git a/net/sunrpc/xprtrdma/transport.c
>>> b/net/sunrpc/xprtrdma/transport.c
>>> index 143ce2579ba9..fa684bf4d090 100644
>>> --- a/net/sunrpc/xprtrdma/transport.c
>>> +++ b/net/sunrpc/xprtrdma/transport.c
>>> @@ -706,9 +706,8 @@ xprt_rdma_free(struct rpc_task *task)
>>> * sent. Do not try to send this message again.
>>> */
>>> static int
>>> -xprt_rdma_send_request(struct rpc_task *task)
>>> +xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task
>>> *task)
>>
>> Nit: The function's documenting comment will need to be updated.
>>
>>
>>> {
>>> - struct rpc_rqst *rqst = task->tk_rqstp;
>>> struct rpc_xprt *xprt = rqst->rq_xprt;
>>> struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
>>> struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
>>> @@ -741,7 +740,7 @@ xprt_rdma_send_request(struct rpc_task *task)
>>> /* An RPC with no reply will throw off credit accounting,
>>> * so drop the connection to reset the credit grant.
>>> */
>>> - if (!rpc_reply_expected(task))
>>> + if (!rpc_reply_expected(rqst->rq_task))
>>> goto drop_connection;
>>> return 0;
>>>
>>> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
>>> index 8d6404259ff9..b8143eded4af 100644
>>> --- a/net/sunrpc/xprtsock.c
>>> +++ b/net/sunrpc/xprtsock.c
>>> @@ -449,12 +449,12 @@ static void xs_nospace_callback(struct
>>> rpc_task *task)
>>>
>>> /**
>>> * xs_nospace - place task on wait queue if transmit was incomplete
>>> + * @req: pointer to RPC request
>>> * @task: task to put to sleep
>>> *
>>> */
>>> -static int xs_nospace(struct rpc_task *task)
>>> +static int xs_nospace(struct rpc_rqst *req, struct rpc_task *task)
>>> {
>>> - struct rpc_rqst *req = task->tk_rqstp;
>>> struct rpc_xprt *xprt = req->rq_xprt;
>>> struct sock_xprt *transport = container_of(xprt, struct
>>> sock_xprt, xprt);
>>> struct sock *sk = transport->inet;
>>> @@ -513,6 +513,7 @@ static inline void
>>> xs_encode_stream_record_marker(struct xdr_buf *buf)
>>>
>>> /**
>>> * xs_local_send_request - write an RPC request to an AF_LOCAL
>>> socket
>>> + * @req: pointer to RPC request
>>> * @task: RPC task that manages the state of an RPC request
>>> *
>>> * Return values:
>>> @@ -522,9 +523,8 @@ static inline void
>>> xs_encode_stream_record_marker(struct xdr_buf *buf)
>>> * ENOTCONN: Caller needs to invoke connect logic then call
>>> again
>>> * other: Some other error occured, the request was not
>>> sent
>>> */
>>> -static int xs_local_send_request(struct rpc_task *task)
>>> +static int xs_local_send_request(struct rpc_rqst *req, struct
>>> rpc_task *task)
>>> {
>>> - struct rpc_rqst *req = task->tk_rqstp;
>>> struct rpc_xprt *xprt = req->rq_xprt;
>>> struct sock_xprt *transport =
>>> container_of(xprt, struct sock_xprt,
>>> xprt);
>>> @@ -569,7 +569,7 @@ static int xs_local_send_request(struct
>>> rpc_task *task)
>>> case -ENOBUFS:
>>> break;
>>> case -EAGAIN:
>>> - status = xs_nospace(task);
>>> + status = xs_nospace(req, task);
>>> break;
>>> default:
>>> dprintk("RPC: sendmsg returned unrecognized error
>>> %d\n",
>>> @@ -585,6 +585,7 @@ static int xs_local_send_request(struct
>>> rpc_task *task)
>>>
>>> /**
>>> * xs_udp_send_request - write an RPC request to a UDP socket
>>> + * @req: pointer to RPC request
>>> * @task: address of RPC task that manages the state of an RPC
>>> request
>>> *
>>> * Return values:
>>> @@ -594,9 +595,8 @@ static int xs_local_send_request(struct
>>> rpc_task *task)
>>> * ENOTCONN: Caller needs to invoke connect logic then call
>>> again
>>> * other: Some other error occurred, the request was not
>>> sent
>>> */
>>> -static int xs_udp_send_request(struct rpc_task *task)
>>> +static int xs_udp_send_request(struct rpc_rqst *req, struct
>>> rpc_task *task)
>>> {
>>> - struct rpc_rqst *req = task->tk_rqstp;
>>> struct rpc_xprt *xprt = req->rq_xprt;
>>> struct sock_xprt *transport = container_of(xprt, struct
>>> sock_xprt, xprt);
>>> struct xdr_buf *xdr = &req->rq_snd_buf;
>>> @@ -638,7 +638,7 @@ static int xs_udp_send_request(struct rpc_task
>>> *task)
>>> /* Should we call xs_close() here? */
>>> break;
>>> case -EAGAIN:
>>> - status = xs_nospace(task);
>>> + status = xs_nospace(req, task);
>>> break;
>>> case -ENETUNREACH:
>>> case -ENOBUFS:
>>> @@ -658,6 +658,7 @@ static int xs_udp_send_request(struct rpc_task
>>> *task)
>>>
>>> /**
>>> * xs_tcp_send_request - write an RPC request to a TCP socket
>>> + * @req: pointer to RPC request
>>> * @task: address of RPC task that manages the state of an RPC
>>> request
>>> *
>>> * Return values:
>>> @@ -670,9 +671,8 @@ static int xs_udp_send_request(struct rpc_task
>>> *task)
>>> * XXX: In the case of soft timeouts, should we eventually give up
>>> * if sendmsg is not able to make progress?
>>> */
>>> -static int xs_tcp_send_request(struct rpc_task *task)
>>> +static int xs_tcp_send_request(struct rpc_rqst *req, struct
>>> rpc_task *task)
>>> {
>>> - struct rpc_rqst *req = task->tk_rqstp;
>>> struct rpc_xprt *xprt = req->rq_xprt;
>>> struct sock_xprt *transport = container_of(xprt, struct
>>> sock_xprt, xprt);
>>> struct xdr_buf *xdr = &req->rq_snd_buf;
>>> @@ -697,7 +697,7 @@ static int xs_tcp_send_request(struct rpc_task
>>> *task)
>>> * completes while the socket holds a reference to the pages,
>>> * then we may end up resending corrupted data.
>>> */
>>> - if (task->tk_flags & RPC_TASK_SENT)
>>> + if (req->rq_task->tk_flags & RPC_TASK_SENT)
>>> zerocopy = false;
>>>
>>> if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
>>> @@ -761,7 +761,7 @@ static int xs_tcp_send_request(struct rpc_task
>>> *task)
>>> /* Should we call xs_close() here? */
>>> break;
>>> case -EAGAIN:
>>> - status = xs_nospace(task);
>>> + status = xs_nospace(req, task);
>>> break;
>>> case -ECONNRESET:
>>> case -ECONNREFUSED:
>>> @@ -2706,9 +2706,8 @@ static int bc_sendto(struct rpc_rqst *req)
>>> /*
>>> * The send routine. Borrows from svc_send
>>> */
>>> -static int bc_send_request(struct rpc_task *task)
>>> +static int bc_send_request(struct rpc_rqst *req, struct rpc_task
>>> *task)
>>> {
>>> - struct rpc_rqst *req = task->tk_rqstp;
>>> struct svc_xprt *xprt;
>>> int len;
>>>
>>> --
>>> 2.17.1
>>>
>>
>> --
>> Chuck Lever
>> [email protected]
>>
>>
>>
>

--
Chuck Lever
[email protected]

2018-09-03 22:16:57

by Trond Myklebust

[permalink] [raw]
Subject: Re: [PATCH 00/27] Convert RPC client transmission to a queued model

On Mon, 2018-09-03 at 13:41 -0400, Chuck Lever wrote:
> > On Sep 3, 2018, at 11:29 AM, Trond Myklebust <[email protected]>
> > wrote:
> >
> > For historical reasons, the RPC client is heavily serialised during
> > the
> > process of transmitting a request by the XPRT_LOCK. A request is
> > required to take that lock before it can start XDR encoding, and it
> > is
> > required to hold it until it is done transmitting. In essence the
> > lock
> > protects the following functions:
> >
> > - Stream based transport connect/reconnect
> > - RPCSEC_GSS encoding of the RPC message
> > - Transmission of a single RPC message
>
> It also protects TCP rqst slot allocation:
>
> void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task
> *task)
> {
> /* Note: grabbing the xprt_lock_write() ensures that we
> throttle
> * new slot allocation if the transport is congested (i.e.
> when
> * reconnecting a stream transport or when out of socket
> write
> * buffer space).
> */
> if (xprt_lock_write(xprt, task)) {
> xprt_alloc_slot(xprt, task);
> xprt_release_write(xprt, task);
> }
> }

Ack. That needs some thought.

>
> > The following patch set assumes that we do not need to do much to
> > improve performance of the connect/reconnect case, as that is
> > supposed
> > to be a rare occurrence.
> >
> > The set looks at dealing with RPCSEC_GSS issues by removing
> > serialisation
> > while encoding, and simply assuming that if we detect after
> > grabbing the
> > XPRT_LOCK that we're about to transmit a message with a sequence
> > number
> > that has fallen outside the window allowed by RFC2203, then we can
> > abort the transmission of that message, and schedule it for re-
> > encoding.
> > Since window sizes are typically expected to lie above 100 messages
> > or
> > so, we expect these cases where we miss the window to be rare, in
> > general.
> >
> > Finally, we look at trying to avoid the requirement that every
> > request
> > must go through the process of being woken up to grab the XPRT_LOCK
> > in
> > order to transmit itself by allowing a request that currently holds
> > the
> > XPRT_LOCK to grab other requests from an ordered queue, and to
> > transmit
> > them too. The bulk of the changes in this patchset are dedicated to
> > providing this functionality.
>
> When considering whether this kind of change could work for
> xprtrdma: the transport send lock mechanism is used to manage
> the credit grant. The transport send lock prevents the client
> from sending too many RPC Calls at once.
>
> Congestion- or flow-controlled transports might not be able to
> adopt this approach, because there needs to be a check before
> each RPC Call is sent to see if the congestion/credit window
> has room.

That's a good point. We might want to put an additional check for
congestion overflow in the send request to push back when we hit the
credit window limit. I'll think about that.

>
> > Trond Myklebust (27):
> > SUNRPC: Clean up initialisation of the struct rpc_rqst
> > SUNRPC: If there is no reply expected, bail early from call_decode
> > SUNRPC: The transmitted message must lie in the RPCSEC window of
> > validity
> > SUNRPC: Simplify identification of when the message send/receive
> > is
> > complete
> > SUNRPC: Avoid holding locks across the XDR encoding of the RPC
> > message
> > SUNRPC: Rename TCP receive-specific state variables
> > SUNRPC: Move reset of TCP state variables into the reconnect code
> > SUNRPC: Add socket transmit queue offset tracking
> > SUNRPC: Simplify dealing with aborted partially transmitted
> > messages
> > SUNRPC: Refactor the transport request pinning
> > SUNRPC: Add a helper to wake up a sleeping rpc_task and set its
> > status
> > SUNRPC: Don't wake queued RPC calls multiple times in
> > xprt_transmit
> > SUNRPC: Rename xprt->recv_lock to xprt->queue_lock
> > SUNRPC: Refactor xprt_transmit() to remove the reply queue code
> > SUNRPC: Refactor xprt_transmit() to remove wait for reply code
> > SUNRPC: Minor cleanup for call_transmit()
> > SUNRPC: Distinguish between the slot allocation list and receive
> > queue
> > NFS: Add a transmission queue for RPC requests
> > SUNRPC: Refactor RPC call encoding
> > SUNRPC: Treat the task and request as separate in the
> > xprt_ops->send_request()
> > SUNRPC: Don't reset the request 'bytes_sent' counter when
> > releasing
> > XPRT_LOCK
> > SUNRPC: Simplify xprt_prepare_transmit()
> > SUNRPC: Move RPC retransmission stat counter to xprt_transmit()
> > SUNRPC: Fix up the back channel transmit
> > SUNRPC: Allow calls to xprt_transmit() to drain the entire
> > transmit
> > queue
> > SUNRPC: Queue the request for transmission immediately after
> > encoding
> > SUNRPC: Convert the xprt->sending queue back to an ordinary wait
> > queue
> >
> > include/linux/sunrpc/auth.h | 2 +
> > include/linux/sunrpc/auth_gss.h | 1 +
> > include/linux/sunrpc/sched.h | 7 +-
> > include/linux/sunrpc/xprt.h | 23 +-
> > include/linux/sunrpc/xprtsock.h | 23 +-
> > include/trace/events/sunrpc.h | 10 +-
> > net/sunrpc/auth.c | 10 +
> > net/sunrpc/auth_gss/auth_gss.c | 41 ++
> > net/sunrpc/backchannel_rqst.c | 3 +-
> > net/sunrpc/clnt.c | 139 +++---
> > net/sunrpc/sched.c | 63 ++-
> > net/sunrpc/svcsock.c | 6 +-
> > net/sunrpc/xprt.c | 503 +++++++++++++-----
> > ---
> > net/sunrpc/xprtrdma/backchannel.c | 3 +-
> > net/sunrpc/xprtrdma/rpc_rdma.c | 10 +-
> > net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 7 +-
> > net/sunrpc/xprtrdma/transport.c | 5 +-
> > net/sunrpc/xprtsock.c | 327 +++++++-------
> > 18 files changed, 728 insertions(+), 455 deletions(-)
> >
> > --
> > 2.17.1
> >
>
> --
> Chuck Lever
>
>
>

2018-09-04 20:12:01

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH 03/27] SUNRPC: The transmitted message must lie in the RPCSEC window of validity

On Mon, Sep 03, 2018 at 11:29:12AM -0400, Trond Myklebust wrote:
> If a message has been encoded using RPCSEC_GSS, the server is
> maintaining a window of sequence numbers that it considers valid.
> The client should normally be tracking that window, and needs to
> verify that the sequence number used by the message being transmitted
> still lies inside the window of validity.
>
> So far, we've been able to assume this condition would be realised
> automatically, since the server has been encoding the message only

Do you mean "the client has been encoding..."?

--b.

> after taking the socket lock. Once we change that condition, we
> will need the explicit check.
>
> Signed-off-by: Trond Myklebust <[email protected]>
> ---
> include/linux/sunrpc/auth.h | 2 ++
> include/linux/sunrpc/auth_gss.h | 1 +
> net/sunrpc/auth.c | 10 ++++++++
> net/sunrpc/auth_gss/auth_gss.c | 41 +++++++++++++++++++++++++++++++++
> net/sunrpc/clnt.c | 3 +++
> net/sunrpc/xprt.c | 7 ++++++
> 6 files changed, 64 insertions(+)
>
> diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h
> index 58a6765c1c5e..2c97a3933ef9 100644
> --- a/include/linux/sunrpc/auth.h
> +++ b/include/linux/sunrpc/auth.h
> @@ -157,6 +157,7 @@ struct rpc_credops {
> int (*crkey_timeout)(struct rpc_cred *);
> bool (*crkey_to_expire)(struct rpc_cred *);
> char * (*crstringify_acceptor)(struct rpc_cred *);
> + bool (*crneed_reencode)(struct rpc_task *);
> };
>
> extern const struct rpc_authops authunix_ops;
> @@ -192,6 +193,7 @@ __be32 * rpcauth_marshcred(struct rpc_task *, __be32 *);
> __be32 * rpcauth_checkverf(struct rpc_task *, __be32 *);
> int rpcauth_wrap_req(struct rpc_task *task, kxdreproc_t encode, void *rqstp, __be32 *data, void *obj);
> int rpcauth_unwrap_resp(struct rpc_task *task, kxdrdproc_t decode, void *rqstp, __be32 *data, void *obj);
> +bool rpcauth_xmit_need_reencode(struct rpc_task *task);
> int rpcauth_refreshcred(struct rpc_task *);
> void rpcauth_invalcred(struct rpc_task *);
> int rpcauth_uptodatecred(struct rpc_task *);
> diff --git a/include/linux/sunrpc/auth_gss.h b/include/linux/sunrpc/auth_gss.h
> index 0c9eac351aab..30427b729070 100644
> --- a/include/linux/sunrpc/auth_gss.h
> +++ b/include/linux/sunrpc/auth_gss.h
> @@ -70,6 +70,7 @@ struct gss_cl_ctx {
> refcount_t count;
> enum rpc_gss_proc gc_proc;
> u32 gc_seq;
> + u32 gc_seq_xmit;
> spinlock_t gc_seq_lock;
> struct gss_ctx *gc_gss_ctx;
> struct xdr_netobj gc_wire_ctx;
> diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
> index 305ecea92170..59df5cdba0ac 100644
> --- a/net/sunrpc/auth.c
> +++ b/net/sunrpc/auth.c
> @@ -817,6 +817,16 @@ rpcauth_unwrap_resp(struct rpc_task *task, kxdrdproc_t decode, void *rqstp,
> return rpcauth_unwrap_req_decode(decode, rqstp, data, obj);
> }
>
> +bool
> +rpcauth_xmit_need_reencode(struct rpc_task *task)
> +{
> + struct rpc_cred *cred = task->tk_rqstp->rq_cred;
> +
> + if (!cred || !cred->cr_ops->crneed_reencode)
> + return false;
> + return cred->cr_ops->crneed_reencode(task);
> +}
> +
> int
> rpcauth_refreshcred(struct rpc_task *task)
> {
> diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
> index 21c0aa0a0d1d..c898a7c75e84 100644
> --- a/net/sunrpc/auth_gss/auth_gss.c
> +++ b/net/sunrpc/auth_gss/auth_gss.c
> @@ -1984,6 +1984,46 @@ gss_unwrap_req_decode(kxdrdproc_t decode, struct rpc_rqst *rqstp,
> return decode(rqstp, &xdr, obj);
> }
>
> +static bool
> +gss_seq_is_newer(u32 new, u32 old)
> +{
> + return (s32)(new - old) > 0;
> +}
> +
> +static bool
> +gss_xmit_need_reencode(struct rpc_task *task)
> +{
> + struct rpc_rqst *req = task->tk_rqstp;
> + struct rpc_cred *cred = req->rq_cred;
> + struct gss_cl_ctx *ctx = gss_cred_get_ctx(cred);
> + u32 win, seq_xmit;
> + bool ret = true;
> +
> + if (!ctx)
> + return true;
> +
> + if (gss_seq_is_newer(req->rq_seqno, READ_ONCE(ctx->gc_seq)))
> + goto out;
> +
> + seq_xmit = READ_ONCE(ctx->gc_seq_xmit);
> + while (gss_seq_is_newer(req->rq_seqno, seq_xmit)) {
> + u32 tmp = seq_xmit;
> +
> + seq_xmit = cmpxchg(&ctx->gc_seq_xmit, tmp, req->rq_seqno);
> + if (seq_xmit == tmp) {
> + ret = false;
> + goto out;
> + }
> + }
> +
> + win = ctx->gc_win;
> + if (win > 0)
> + ret = !gss_seq_is_newer(req->rq_seqno, seq_xmit - win);
> +out:
> + gss_put_ctx(ctx);
> + return ret;
> +}
> +
> static int
> gss_unwrap_resp(struct rpc_task *task,
> kxdrdproc_t decode, void *rqstp, __be32 *p, void *obj)
> @@ -2052,6 +2092,7 @@ static const struct rpc_credops gss_credops = {
> .crunwrap_resp = gss_unwrap_resp,
> .crkey_timeout = gss_key_timeout,
> .crstringify_acceptor = gss_stringify_acceptor,
> + .crneed_reencode = gss_xmit_need_reencode,
> };
>
> static const struct rpc_credops gss_nullops = {
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index 4f1ec8013332..d41b5ac1d4e8 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -2184,6 +2184,9 @@ call_status(struct rpc_task *task)
> /* shutdown or soft timeout */
> rpc_exit(task, status);
> break;
> + case -EBADMSG:
> + task->tk_action = call_transmit;
> + break;
> default:
> if (clnt->cl_chatty)
> printk("%s: RPC call returned error %d\n",
> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> index 6aa09edc9567..3973e10ea2bd 100644
> --- a/net/sunrpc/xprt.c
> +++ b/net/sunrpc/xprt.c
> @@ -1014,6 +1014,13 @@ void xprt_transmit(struct rpc_task *task)
> dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
>
> if (!req->rq_reply_bytes_recvd) {
> +
> + /* Verify that our message lies in the RPCSEC_GSS window */
> + if (!req->rq_bytes_sent && rpcauth_xmit_need_reencode(task)) {
> + task->tk_status = -EBADMSG;
> + return;
> + }
> +
> if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
> /*
> * Add to the list only if we're expecting a reply
> --
> 2.17.1