2017-08-18 18:51:59

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH v4 0/5] RPC client latency fixes

The following patches apply on top of the writeback patches sent last
week. They start the process of reducing contention in the RPC client
receive code due to excessive holding of the transport lock.

The first patch is the most important in that it allows the client to
drop the transport lock while copying data from the socket into the
RPC call's reply buffers.
The second ensures that we don't keep hogging the workqueue worker thread
forever, but that we reschedule the receive job if it keeps looping.
Finally, a small cleanup to avoid having to make a copy of the
struct xdr_skb_reader on the stack.

v2: Reorder patches to add I/O throttling after the core RPC latency
changes
Fix a crash when receiving backchannel requests.

v3: Another bugfix for backchannel requests.
Add a separate spinlock to protect the RPC request receive list
Remove I/O throttling patches

v4: Fix up the two remaining callers of xprt_lookup_rqst() and
xprt_complete_rqst() to use the new locking


Trond Myklebust (5):
SUNRPC: Don't hold the transport lock across socket copy operations
SUNRPC: Don't hold the transport lock when receiving backchannel data
SUNRPC: Don't loop forever in xs_tcp_data_receive()
SUNRPC: Cleanup xs_tcp_read_common()
SUNRPC: Add a separate spinlock to protect the RPC request receive
list

include/linux/sunrpc/sched.h | 2 +
include/linux/sunrpc/xprt.h | 3 ++
net/sunrpc/backchannel_rqst.c | 4 +-
net/sunrpc/svcsock.c | 6 +--
net/sunrpc/xprt.c | 55 ++++++++++++++++++--
net/sunrpc/xprtrdma/rpc_rdma.c | 8 +--
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 7 ++-
net/sunrpc/xprtsock.c | 82 ++++++++++++++++--------------
8 files changed, 113 insertions(+), 54 deletions(-)

--
2.13.5



2017-08-18 18:52:00

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH v4 1/5] SUNRPC: Don't hold the transport lock across socket copy operations

Instead add a mechanism to ensure that the request doesn't disappear
from underneath us while copying from the socket. We do this by
preventing xprt_release() from freeing the XDR buffers until the
flag RPC_TASK_MSG_RECV has been cleared from the request.

Signed-off-by: Trond Myklebust <[email protected]>
Reviewed-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/sched.h | 2 ++
include/linux/sunrpc/xprt.h | 2 ++
net/sunrpc/xprt.c | 43 +++++++++++++++++++++++++++++++++++++++++++
net/sunrpc/xprtsock.c | 23 ++++++++++++++++++-----
4 files changed, 65 insertions(+), 5 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 50a99a117da7..c1768f9d993b 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -139,6 +139,8 @@ struct rpc_task_setup {
#define RPC_TASK_RUNNING 0
#define RPC_TASK_QUEUED 1
#define RPC_TASK_ACTIVE 2
+#define RPC_TASK_MSG_RECV 3
+#define RPC_TASK_MSG_RECV_WAIT 4

#define RPC_IS_RUNNING(t) test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
#define rpc_set_running(t) set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index eab1c749e192..65b9e0224753 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -372,6 +372,8 @@ void xprt_write_space(struct rpc_xprt *xprt);
void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result);
struct rpc_rqst * xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid);
void xprt_complete_rqst(struct rpc_task *task, int copied);
+void xprt_pin_rqst(struct rpc_rqst *req);
+void xprt_unpin_rqst(struct rpc_rqst *req);
void xprt_release_rqst_cong(struct rpc_task *task);
void xprt_disconnect_done(struct rpc_xprt *xprt);
void xprt_force_disconnect(struct rpc_xprt *xprt);
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 4654a9934269..3eb9ec16eec4 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -844,6 +844,48 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
}
EXPORT_SYMBOL_GPL(xprt_lookup_rqst);

+/**
+ * xprt_pin_rqst - Pin a request on the transport receive list
+ * @req: Request to pin
+ *
+ * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
+ * so should be holding the xprt transport lock.
+ */
+void xprt_pin_rqst(struct rpc_rqst *req)
+{
+ set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
+}
+
+/**
+ * xprt_unpin_rqst - Unpin a request on the transport receive list
+ * @req: Request to pin
+ *
+ * Caller should be holding the xprt transport lock.
+ */
+void xprt_unpin_rqst(struct rpc_rqst *req)
+{
+ struct rpc_task *task = req->rq_task;
+
+ clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate);
+ if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
+ wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
+}
+
+static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
+__must_hold(&req->rq_xprt->transport_lock)
+{
+ struct rpc_task *task = req->rq_task;
+
+ if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
+ spin_unlock_bh(&req->rq_xprt->transport_lock);
+ set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
+ wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
+ TASK_UNINTERRUPTIBLE);
+ clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
+ spin_lock_bh(&req->rq_xprt->transport_lock);
+ }
+}
+
static void xprt_update_rtt(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
@@ -1295,6 +1337,7 @@ void xprt_release(struct rpc_task *task)
list_del(&req->rq_list);
xprt->last_used = jiffies;
xprt_schedule_autodisconnect(xprt);
+ xprt_wait_on_pinned_rqst(req);
spin_unlock_bh(&xprt->transport_lock);
if (req->rq_buffer)
xprt->ops->buf_free(task);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 4f154d388748..04dbc7027712 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -973,6 +973,8 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr)
goto out_unlock;
+ xprt_pin_rqst(rovr);
+ spin_unlock_bh(&xprt->transport_lock);
task = rovr->rq_task;

copied = rovr->rq_private_buf.buflen;
@@ -981,11 +983,14 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,

if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
dprintk("RPC: sk_buff copy failed\n");
- goto out_unlock;
+ spin_lock_bh(&xprt->transport_lock);
+ goto out_unpin;
}

+ spin_lock_bh(&xprt->transport_lock);
xprt_complete_rqst(task, copied);
-
+out_unpin:
+ xprt_unpin_rqst(rovr);
out_unlock:
spin_unlock_bh(&xprt->transport_lock);
}
@@ -1054,6 +1059,8 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr)
goto out_unlock;
+ xprt_pin_rqst(rovr);
+ spin_unlock_bh(&xprt->transport_lock);
task = rovr->rq_task;

if ((copied = rovr->rq_private_buf.buflen) > repsize)
@@ -1062,14 +1069,17 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
/* Suck it into the iovec, verify checksum if not done by hw. */
if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
__UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
- goto out_unlock;
+ spin_lock_bh(&xprt->transport_lock);
+ goto out_unpin;
}

__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);

+ spin_lock_bh(&xprt->transport_lock);
xprt_adjust_cwnd(xprt, task, copied);
xprt_complete_rqst(task, copied);
-
+out_unpin:
+ xprt_unpin_rqst(rovr);
out_unlock:
spin_unlock_bh(&xprt->transport_lock);
}
@@ -1351,12 +1361,15 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
spin_unlock_bh(&xprt->transport_lock);
return -1;
}
+ xprt_pin_rqst(req);
+ spin_unlock_bh(&xprt->transport_lock);

xs_tcp_read_common(xprt, desc, req);

+ spin_lock_bh(&xprt->transport_lock);
if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
xprt_complete_rqst(req->rq_task, transport->tcp_copied);
-
+ xprt_unpin_rqst(req);
spin_unlock_bh(&xprt->transport_lock);
return 0;
}
--
2.13.5


2017-08-18 18:52:04

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH v4 3/5] SUNRPC: Don't loop forever in xs_tcp_data_receive()

Ensure that we don't hog the workqueue thread by requeuing the job
every 64 loops.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/xprtsock.c | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index a2312f14beb4..e8f44fc76754 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1526,6 +1526,7 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
.arg.data = xprt,
};
unsigned long total = 0;
+ int loop;
int read = 0;

mutex_lock(&transport->recv_mutex);
@@ -1534,20 +1535,20 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
goto out;

/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
- for (;;) {
+ for (loop = 0; loop < 64; loop++) {
lock_sock(sk);
read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
if (read <= 0) {
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
release_sock(sk);
- if (!test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
- break;
- } else {
- release_sock(sk);
- total += read;
+ break;
}
+ release_sock(sk);
+ total += read;
rd_desc.count = 65536;
}
+ if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+ queue_work(xprtiod_workqueue, &transport->recv_worker);
out:
mutex_unlock(&transport->recv_mutex);
trace_xs_tcp_data_ready(xprt, read, total);
--
2.13.5


2017-08-18 18:52:05

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH v4 4/5] SUNRPC: Cleanup xs_tcp_read_common()

Simplify the code to avoid a full copy of the struct xdr_skb_reader.

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/xprtsock.c | 25 ++++++++-----------------
1 file changed, 8 insertions(+), 17 deletions(-)

diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index e8f44fc76754..a344bea15fc7 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1287,25 +1287,12 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
}

len = desc->count;
- if (len > transport->tcp_reclen - transport->tcp_offset) {
- struct xdr_skb_reader my_desc;
-
- len = transport->tcp_reclen - transport->tcp_offset;
- memcpy(&my_desc, desc, sizeof(my_desc));
- my_desc.count = len;
- r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
- &my_desc, xdr_skb_read_bits);
- desc->count -= r;
- desc->offset += r;
- } else
- r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
+ if (len > transport->tcp_reclen - transport->tcp_offset)
+ desc->count = transport->tcp_reclen - transport->tcp_offset;
+ r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
desc, xdr_skb_read_bits);

- if (r > 0) {
- transport->tcp_copied += r;
- transport->tcp_offset += r;
- }
- if (r != len) {
+ if (desc->count) {
/* Error when copying to the receive buffer,
* usually because we weren't able to allocate
* additional buffer pages. All we can do now
@@ -1325,6 +1312,10 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
return;
}

+ transport->tcp_copied += r;
+ transport->tcp_offset += r;
+ desc->count = len - r;
+
dprintk("RPC: XID %08x read %zd bytes\n",
ntohl(transport->tcp_xid), r);
dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
--
2.13.5


2017-08-18 18:52:01

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH v4 2/5] SUNRPC: Don't hold the transport lock when receiving backchannel data

The backchannel request has no associated task, so it is going nowhere
until we call xprt_complete_bc_request().

Signed-off-by: Trond Myklebust <[email protected]>
---
net/sunrpc/backchannel_rqst.c | 4 ++--
net/sunrpc/xprtsock.c | 5 +----
2 files changed, 3 insertions(+), 6 deletions(-)

diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index ac701c28f44f..c2c68a15b59d 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -171,10 +171,10 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
/*
* Add the temporary list to the backchannel preallocation list
*/
- spin_lock_bh(&xprt->bc_pa_lock);
+ spin_lock(&xprt->bc_pa_lock);
list_splice(&tmp_list, &xprt->bc_pa_list);
xprt_inc_alloc_count(xprt, min_reqs);
- spin_unlock_bh(&xprt->bc_pa_lock);
+ spin_unlock(&xprt->bc_pa_lock);

dprintk("RPC: setup backchannel transport done\n");
return 0;
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 04dbc7027712..a2312f14beb4 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1389,11 +1389,9 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
container_of(xprt, struct sock_xprt, xprt);
struct rpc_rqst *req;

- /* Look up and lock the request corresponding to the given XID */
- spin_lock_bh(&xprt->transport_lock);
+ /* Look up the request corresponding to the given XID */
req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
if (req == NULL) {
- spin_unlock_bh(&xprt->transport_lock);
printk(KERN_WARNING "Callback slot table overflowed\n");
xprt_force_disconnect(xprt);
return -1;
@@ -1404,7 +1402,6 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,

if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
xprt_complete_bc_request(req, transport->tcp_copied);
- spin_unlock_bh(&xprt->transport_lock);

return 0;
}
--
2.13.5


2017-08-18 18:52:08

by Trond Myklebust

[permalink] [raw]
Subject: [PATCH v4 5/5] SUNRPC: Add a separate spinlock to protect the RPC request receive list

This further reduces contention with the transport_lock, and allows us
to convert to using a non-bh-safe spinlock, since the list is now never
accessed from a bh context.

Signed-off-by: Trond Myklebust <[email protected]>
---
include/linux/sunrpc/xprt.h | 1 +
net/sunrpc/svcsock.c | 6 +++---
net/sunrpc/xprt.c | 20 ++++++++++++--------
net/sunrpc/xprtrdma/rpc_rdma.c | 8 ++++----
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 7 +++++--
net/sunrpc/xprtsock.c | 30 ++++++++++++++++--------------
6 files changed, 41 insertions(+), 31 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 65b9e0224753..a97e6de5f9f2 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -232,6 +232,7 @@ struct rpc_xprt {
*/
spinlock_t transport_lock; /* lock transport info */
spinlock_t reserve_lock; /* lock slot table */
+ spinlock_t recv_lock; /* lock receive list */
u32 xid; /* Next XID value to use */
struct rpc_task * snd_task; /* Task blocked in send */
struct svc_xprt *bc_xprt; /* NFSv4.1 backchannel */
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 2b720fa35c4f..272063ca81e8 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -1001,7 +1001,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)

if (!bc_xprt)
return -EAGAIN;
- spin_lock_bh(&bc_xprt->transport_lock);
+ spin_lock(&bc_xprt->recv_lock);
req = xprt_lookup_rqst(bc_xprt, xid);
if (!req)
goto unlock_notfound;
@@ -1019,7 +1019,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
memcpy(dst->iov_base, src->iov_base, src->iov_len);
xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len);
rqstp->rq_arg.len = 0;
- spin_unlock_bh(&bc_xprt->transport_lock);
+ spin_unlock(&bc_xprt->recv_lock);
return 0;
unlock_notfound:
printk(KERN_NOTICE
@@ -1028,7 +1028,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
__func__, ntohl(calldir),
bc_xprt, ntohl(xid));
unlock_eagain:
- spin_unlock_bh(&bc_xprt->transport_lock);
+ spin_unlock(&bc_xprt->recv_lock);
return -EAGAIN;
}

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 3eb9ec16eec4..2af189c5ac3e 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -872,17 +872,17 @@ void xprt_unpin_rqst(struct rpc_rqst *req)
}

static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
-__must_hold(&req->rq_xprt->transport_lock)
+__must_hold(&req->rq_xprt->recv_lock)
{
struct rpc_task *task = req->rq_task;

if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
- spin_unlock_bh(&req->rq_xprt->transport_lock);
+ spin_unlock(&req->rq_xprt->recv_lock);
set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
TASK_UNINTERRUPTIBLE);
clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
- spin_lock_bh(&req->rq_xprt->transport_lock);
+ spin_lock(&req->rq_xprt->recv_lock);
}
}

@@ -1008,13 +1008,13 @@ void xprt_transmit(struct rpc_task *task)
/*
* Add to the list only if we're expecting a reply
*/
- spin_lock_bh(&xprt->transport_lock);
/* Update the softirq receive buffer */
memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
sizeof(req->rq_private_buf));
/* Add request to the receive list */
+ spin_lock(&xprt->recv_lock);
list_add_tail(&req->rq_list, &xprt->recv);
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->recv_lock);
xprt_reset_majortimeo(req);
/* Turn off autodisconnect */
del_singleshot_timer_sync(&xprt->timer);
@@ -1329,15 +1329,18 @@ void xprt_release(struct rpc_task *task)
task->tk_ops->rpc_count_stats(task, task->tk_calldata);
else if (task->tk_client)
rpc_count_iostats(task, task->tk_client->cl_metrics);
+ spin_lock(&xprt->recv_lock);
+ if (!list_empty(&req->rq_list)) {
+ list_del(&req->rq_list);
+ xprt_wait_on_pinned_rqst(req);
+ }
+ spin_unlock(&xprt->recv_lock);
spin_lock_bh(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task);
if (xprt->ops->release_request)
xprt->ops->release_request(task);
- if (!list_empty(&req->rq_list))
- list_del(&req->rq_list);
xprt->last_used = jiffies;
xprt_schedule_autodisconnect(xprt);
- xprt_wait_on_pinned_rqst(req);
spin_unlock_bh(&xprt->transport_lock);
if (req->rq_buffer)
xprt->ops->buf_free(task);
@@ -1361,6 +1364,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)

spin_lock_init(&xprt->transport_lock);
spin_lock_init(&xprt->reserve_lock);
+ spin_lock_init(&xprt->recv_lock);

INIT_LIST_HEAD(&xprt->free);
INIT_LIST_HEAD(&xprt->recv);
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index ca4d6e4528f3..dfa748a0c8de 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1051,7 +1051,7 @@ rpcrdma_reply_handler(struct work_struct *work)
* RPC completion while holding the transport lock to ensure
* the rep, rqst, and rq_task pointers remain stable.
*/
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->recv_lock);
rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
if (!rqst)
goto out_norqst;
@@ -1136,7 +1136,7 @@ rpcrdma_reply_handler(struct work_struct *work)
xprt_release_rqst_cong(rqst->rq_task);

xprt_complete_rqst(rqst->rq_task, status);
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->recv_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
__func__, xprt, rqst, status);
return;
@@ -1187,12 +1187,12 @@ rpcrdma_reply_handler(struct work_struct *work)
r_xprt->rx_stats.bad_reply_count++;
goto out;

-/* The req was still available, but by the time the transport_lock
+/* The req was still available, but by the time the recv_lock
* was acquired, the rqst and task had been released. Thus the RPC
* has already been terminated.
*/
out_norqst:
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->recv_lock);
rpcrdma_buffer_put(req);
dprintk("RPC: %s: race, no rqst left for req %p\n",
__func__, req);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index c676ed0efb5a..0d574cda242d 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -52,7 +52,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
if (src->iov_len < 24)
goto out_shortreply;

- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->recv_lock);
req = xprt_lookup_rqst(xprt, xid);
if (!req)
goto out_notfound;
@@ -69,17 +69,20 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
credits = r_xprt->rx_buf.rb_bc_max_requests;

+ spin_lock_bh(&xprt->transport_lock);
cwnd = xprt->cwnd;
xprt->cwnd = credits << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(req->rq_task);
+ spin_unlock_bh(&xprt->transport_lock);
+

ret = 0;
xprt_complete_rqst(req->rq_task, rcvbuf->len);
rcvbuf->len = 0;

out_unlock:
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->recv_lock);
out:
return ret;

diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index a344bea15fc7..2b918137aaa0 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -969,12 +969,12 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
return;

/* Look up and lock the request corresponding to the given XID */
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->recv_lock);
rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr)
goto out_unlock;
xprt_pin_rqst(rovr);
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->recv_lock);
task = rovr->rq_task;

copied = rovr->rq_private_buf.buflen;
@@ -983,16 +983,16 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,

if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
dprintk("RPC: sk_buff copy failed\n");
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->recv_lock);
goto out_unpin;
}

- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->recv_lock);
xprt_complete_rqst(task, copied);
out_unpin:
xprt_unpin_rqst(rovr);
out_unlock:
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->recv_lock);
}

static void xs_local_data_receive(struct sock_xprt *transport)
@@ -1055,12 +1055,12 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
return;

/* Look up and lock the request corresponding to the given XID */
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->recv_lock);
rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr)
goto out_unlock;
xprt_pin_rqst(rovr);
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->recv_lock);
task = rovr->rq_task;

if ((copied = rovr->rq_private_buf.buflen) > repsize)
@@ -1069,7 +1069,7 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
/* Suck it into the iovec, verify checksum if not done by hw. */
if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
__UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->recv_lock);
goto out_unpin;
}

@@ -1077,11 +1077,13 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,

spin_lock_bh(&xprt->transport_lock);
xprt_adjust_cwnd(xprt, task, copied);
+ spin_unlock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->recv_lock);
xprt_complete_rqst(task, copied);
out_unpin:
xprt_unpin_rqst(rovr);
out_unlock:
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->recv_lock);
}

static void xs_udp_data_receive(struct sock_xprt *transport)
@@ -1344,24 +1346,24 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid));

/* Find and lock the request corresponding to this xid */
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->recv_lock);
req = xprt_lookup_rqst(xprt, transport->tcp_xid);
if (!req) {
dprintk("RPC: XID %08x request not found!\n",
ntohl(transport->tcp_xid));
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->recv_lock);
return -1;
}
xprt_pin_rqst(req);
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->recv_lock);

xs_tcp_read_common(xprt, desc, req);

- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->recv_lock);
if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
xprt_complete_rqst(req->rq_task, transport->tcp_copied);
xprt_unpin_rqst(req);
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->recv_lock);
return 0;
}

--
2.13.5