This series begins with the usual fixes, then introduces patches
that add support for bi-directional RPC/RDMA. Bi-directional
RPC/RDMA is a pre-requisite for NFSv4.1 on RDMA transports. It
includes both client and server side support, though the server side
is not as far along as I had hoped, and could be postponed to 4.5.
This v1 is an initial request for review, not a "these suckers are
ready to be merged."
Also available in the "nfs-rdma-for-4.4" topic branch of this git repo:
git://git.linux-nfs.org/projects/cel/cel-2.6.git
Or for browsing:
http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfs-rdma-for-4.4
---
Chuck Lever (18):
xprtrdma: Enable swap-on-NFS/RDMA
xprtrdma: Replace global lkey with lkey local to PD
xprtrdma: Remove completion polling budgets
xprtrdma: Refactor reply handler error handling
xprtrdma: Replace send and receive arrays
SUNRPC: Abstract backchannel operations
xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers
xprtrdma: Pre-allocate Work Requests for backchannel
xprtrdma: Add support for sending backward direction RPC replies
xprtrdma: Handle incoming backward direction RPC calls
svcrdma: Add backward direction service for RPC/RDMA transport
SUNRPC: Remove the TCP-only restriction in bc_svc_process()
NFS: Enable client side NFSv4.1 backchannel to use other transports
svcrdma: Define maximum number of backchannel requests
svcrdma: Add svc_rdma_get_context() API that is allowed to fail
svcrdma: Add infrastructure to send backwards direction RPC/RDMA calls
svcrdma: Add infrastructure to receive backwards direction RPC/RDMA replies
xprtrdma: Add class for RDMA backwards direction transport
fs/nfs/callback.c | 33 ++-
include/linux/sunrpc/bc_xprt.h | 5
include/linux/sunrpc/svc_rdma.h | 15 +
include/linux/sunrpc/xprt.h | 6
net/sunrpc/backchannel_rqst.c | 24 ++
net/sunrpc/svc.c | 5
net/sunrpc/xprt.c | 1
net/sunrpc/xprtrdma/Makefile | 1
net/sunrpc/xprtrdma/backchannel.c | 368 ++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/fmr_ops.c | 19 --
net/sunrpc/xprtrdma/frwr_ops.c | 5
net/sunrpc/xprtrdma/physical_ops.c | 10 -
net/sunrpc/xprtrdma/rpc_rdma.c | 212 ++++++++++++++---
net/sunrpc/xprtrdma/svc_rdma.c | 6
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 60 +++++
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 63 +++++
net/sunrpc/xprtrdma/svc_rdma_transport.c | 104 ++++++++
net/sunrpc/xprtrdma/transport.c | 253 ++++++++++++++++++++-
net/sunrpc/xprtrdma/verbs.c | 341 ++++++++++++++++------------
net/sunrpc/xprtrdma/xprt_rdma.h | 56 ++++-
net/sunrpc/xprtsock.c | 16 +
21 files changed, 1341 insertions(+), 262 deletions(-)
create mode 100644 net/sunrpc/xprtrdma/backchannel.c
--
Chuck Lever
After adding a swapfile on an NFS/RDMA mount and removing the
normal swap partition, I was able to push the NFS client well
into swap without any issue.
I forgot to swapoff the NFS file before rebooting. This pinned
the NFS mount and the IB core and provider, causing shutdown to
hang. I think this is expected and safe behavior. Probably
shutdown scripts should "swapoff -a" before unmounting any
filesystems.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 41e452b..e9e5ed7 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -676,7 +676,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
static int
xprt_rdma_enable_swap(struct rpc_xprt *xprt)
{
- return -EINVAL;
+ return 0;
}
static void
The core API has changed so that devices that do not have a global
DMA lkey automatically create an mr, per-PD, and make that lkey
available. The global DMA lkey interface is going away in favor of
the per-PD DMA lkey.
The per-PD DMA lkey is always available. Convert xprtrdma to use the
device's per-PD DMA lkey for regbufs, no matter which memory
registration scheme is in use.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/fmr_ops.c | 19 -------------------
net/sunrpc/xprtrdma/frwr_ops.c | 5 -----
net/sunrpc/xprtrdma/physical_ops.c | 10 +---------
net/sunrpc/xprtrdma/verbs.c | 2 +-
net/sunrpc/xprtrdma/xprt_rdma.h | 1 -
5 files changed, 2 insertions(+), 35 deletions(-)
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index cb25c89..f1e8daf 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -39,25 +39,6 @@ static int
fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
struct rpcrdma_create_data_internal *cdata)
{
- struct ib_device_attr *devattr = &ia->ri_devattr;
- struct ib_mr *mr;
-
- /* Obtain an lkey to use for the regbufs, which are
- * protected from remote access.
- */
- if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
- ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
- } else {
- mr = ib_get_dma_mr(ia->ri_pd, IB_ACCESS_LOCAL_WRITE);
- if (IS_ERR(mr)) {
- pr_err("%s: ib_get_dma_mr for failed with %lX\n",
- __func__, PTR_ERR(mr));
- return -ENOMEM;
- }
- ia->ri_dma_lkey = ia->ri_dma_mr->lkey;
- ia->ri_dma_mr = mr;
- }
-
return 0;
}
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 21b3efb..004f1ad 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -189,11 +189,6 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
struct ib_device_attr *devattr = &ia->ri_devattr;
int depth, delta;
- /* Obtain an lkey to use for the regbufs, which are
- * protected from remote access.
- */
- ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
-
ia->ri_max_frmr_depth =
min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
devattr->max_fast_reg_page_list_len);
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
index 72cf8b1..617b76f 100644
--- a/net/sunrpc/xprtrdma/physical_ops.c
+++ b/net/sunrpc/xprtrdma/physical_ops.c
@@ -23,7 +23,6 @@ static int
physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
struct rpcrdma_create_data_internal *cdata)
{
- struct ib_device_attr *devattr = &ia->ri_devattr;
struct ib_mr *mr;
/* Obtain an rkey to use for RPC data payloads.
@@ -37,15 +36,8 @@ physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
__func__, PTR_ERR(mr));
return -ENOMEM;
}
- ia->ri_dma_mr = mr;
-
- /* Obtain an lkey to use for regbufs.
- */
- if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)
- ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
- else
- ia->ri_dma_lkey = ia->ri_dma_mr->lkey;
+ ia->ri_dma_mr = mr;
return 0;
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 01a314a..8a477e2 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1255,7 +1255,7 @@ rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
goto out_free;
iov->length = size;
- iov->lkey = ia->ri_dma_lkey;
+ iov->lkey = ia->ri_pd->local_dma_lkey;
rb->rg_size = size;
rb->rg_owner = NULL;
return rb;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 0251222..c09414e 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -65,7 +65,6 @@ struct rpcrdma_ia {
struct rdma_cm_id *ri_id;
struct ib_pd *ri_pd;
struct ib_mr *ri_dma_mr;
- u32 ri_dma_lkey;
struct completion ri_done;
int ri_async_rc;
unsigned int ri_max_frmr_depth;
Commit 8301a2c047cc ("xprtrdma: Limit work done by completion
handler") was supposed to prevent xprtrdma's upcall handlers from
starving other softIRQ work by letting them return to the provider
before all CQEs have been polled.
The logic assumes the provider will call the upcall handler again
immediately if the CQ is re-armed while there are still queued CQEs.
This assumption is invalid. The IBTA spec says that after a CQ is
armed, the hardware must interrupt only when a new CQE is inserted.
xprtrdma can't rely on the provider calling again, even though some
providers do.
Therefore, leaving CQEs on queue makes sense only when there is
another mechanism that ensures all remaining CQEs are consumed in a
timely fashion. xprtrdma does not have such a mechanism. If a CQE
remains queued, the transport can wait forever to send the next RPC.
Finally, move the wcs array back onto the stack to ensure that the
poll array is always local to the CPU where the completion upcall is
running.
Fixes: 8301a2c047cc ("xprtrdma: Limit work done by completion ...")
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 100 ++++++++++++++++++---------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 5 --
2 files changed, 45 insertions(+), 60 deletions(-)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 8a477e2..f2e3863 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -158,34 +158,37 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
}
}
-static int
+/* The wc array is on stack: automatic memory is always CPU-local.
+ *
+ * The common case is a single completion is ready. By asking
+ * for two entries, a return code of 1 means there is exactly
+ * one completion and no more. We don't have to poll again to
+ * know that the CQ is now empty.
+ */
+static void
rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
{
- struct ib_wc *wcs;
- int budget, count, rc;
+ struct ib_wc *pos, wcs[2];
+ int count, rc;
- budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
do {
- wcs = ep->rep_send_wcs;
+ pos = wcs;
- rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
- if (rc <= 0)
- return rc;
+ rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+ if (rc < 0)
+ goto out_warn;
count = rc;
while (count-- > 0)
- rpcrdma_sendcq_process_wc(wcs++);
- } while (rc == RPCRDMA_POLLSIZE && --budget);
- return 0;
+ rpcrdma_sendcq_process_wc(pos++);
+ } while (rc == ARRAY_SIZE(wcs));
+ return;
+
+out_warn:
+ pr_warn("RPC: %s: ib_poll_cq() failed %i\n", __func__, rc);
}
-/*
- * Handle send, fast_reg_mr, and local_inv completions.
- *
- * Send events are typically suppressed and thus do not result
- * in an upcall. Occasionally one is signaled, however. This
- * prevents the provider's completion queue from wrapping and
- * losing a completion.
+/* Handle provider send completion upcalls.
*/
static void
rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
@@ -193,12 +196,7 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
int rc;
- rc = rpcrdma_sendcq_poll(cq, ep);
- if (rc) {
- dprintk("RPC: %s: ib_poll_cq failed: %i\n",
- __func__, rc);
- return;
- }
+ rpcrdma_sendcq_poll(cq, ep);
rc = ib_req_notify_cq(cq,
IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
@@ -247,44 +245,41 @@ out_fail:
goto out_schedule;
}
-static int
+/* The wc array is on stack: automatic memory is always CPU-local.
+ *
+ * struct ib_wc is 64 bytes, making the poll array potentially
+ * large. But this is at the bottom of the call chain. Further
+ * substantial work is done in another thread.
+ */
+static void
rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
{
- struct list_head sched_list;
- struct ib_wc *wcs;
- int budget, count, rc;
+ struct ib_wc *pos, wcs[4];
+ LIST_HEAD(sched_list);
+ int count, rc;
- INIT_LIST_HEAD(&sched_list);
- budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
do {
- wcs = ep->rep_recv_wcs;
+ pos = wcs;
- rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
- if (rc <= 0)
- goto out_schedule;
+ rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+ if (rc < 0)
+ goto out_warn;
count = rc;
while (count-- > 0)
- rpcrdma_recvcq_process_wc(wcs++, &sched_list);
- } while (rc == RPCRDMA_POLLSIZE && --budget);
- rc = 0;
+ rpcrdma_recvcq_process_wc(pos++, &sched_list);
+ } while (rc == ARRAY_SIZE(wcs));
out_schedule:
rpcrdma_schedule_tasklet(&sched_list);
- return rc;
+ return;
+
+out_warn:
+ pr_warn("RPC: %s: ib_poll_cq() failed %i\n", __func__, rc);
+ goto out_schedule;
}
-/*
- * Handle receive completions.
- *
- * It is reentrant but processes single events in order to maintain
- * ordering of receives to keep server credits.
- *
- * It is the responsibility of the scheduled tasklet to return
- * recv buffers to the pool. NOTE: this affects synchronization of
- * connection shutdown. That is, the structures required for
- * the completion of the reply handler must remain intact until
- * all memory has been reclaimed.
+/* Handle provider receive completion upcalls.
*/
static void
rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
@@ -292,12 +287,7 @@ rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
int rc;
- rc = rpcrdma_recvcq_poll(cq, ep);
- if (rc) {
- dprintk("RPC: %s: ib_poll_cq failed: %i\n",
- __func__, rc);
- return;
- }
+ rpcrdma_recvcq_poll(cq, ep);
rc = ib_req_notify_cq(cq,
IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c09414e..42c8d44 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -77,9 +77,6 @@ struct rpcrdma_ia {
* RDMA Endpoint -- one per transport instance
*/
-#define RPCRDMA_WC_BUDGET (128)
-#define RPCRDMA_POLLSIZE (16)
-
struct rpcrdma_ep {
atomic_t rep_cqcount;
int rep_cqinit;
@@ -89,8 +86,6 @@ struct rpcrdma_ep {
struct rdma_conn_param rep_remote_cma;
struct sockaddr_storage rep_remote_addr;
struct delayed_work rep_connect_worker;
- struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
- struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
};
/*
Clean up: The error cases in rpcrdma_reply_handler() almost never
execute. Ensure the compiler places them out of the hot path.
No behavior change expected.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 90 ++++++++++++++++++++++-----------------
net/sunrpc/xprtrdma/verbs.c | 2 -
net/sunrpc/xprtrdma/xprt_rdma.h | 2 +
3 files changed, 54 insertions(+), 40 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index bc8bd65..287c874 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -741,52 +741,27 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
unsigned long cwnd;
u32 credits;
- /* Check status. If bad, signal disconnect and return rep to pool */
- if (rep->rr_len == ~0U) {
- rpcrdma_recv_buffer_put(rep);
- if (r_xprt->rx_ep.rep_connected == 1) {
- r_xprt->rx_ep.rep_connected = -EIO;
- rpcrdma_conn_func(&r_xprt->rx_ep);
- }
- return;
- }
- if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
- dprintk("RPC: %s: short/invalid reply\n", __func__);
- goto repost;
- }
+ dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
+
+ if (rep->rr_len == RPCRDMA_BAD_LEN)
+ goto out_badstatus;
+ if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+ goto out_shortreply;
+
headerp = rdmab_to_msg(rep->rr_rdmabuf);
- if (headerp->rm_vers != rpcrdma_version) {
- dprintk("RPC: %s: invalid version %d\n",
- __func__, be32_to_cpu(headerp->rm_vers));
- goto repost;
- }
+ if (headerp->rm_vers != rpcrdma_version)
+ goto out_badversion;
/* Get XID and try for a match. */
spin_lock(&xprt->transport_lock);
rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
- if (rqst == NULL) {
- spin_unlock(&xprt->transport_lock);
- dprintk("RPC: %s: reply 0x%p failed "
- "to match any request xid 0x%08x len %d\n",
- __func__, rep, be32_to_cpu(headerp->rm_xid),
- rep->rr_len);
-repost:
- r_xprt->rx_stats.bad_reply_count++;
- if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
- rpcrdma_recv_buffer_put(rep);
-
- return;
- }
+ if (!rqst)
+ goto out_nomatch;
/* get request object */
req = rpcr_to_rdmar(rqst);
- if (req->rl_reply) {
- spin_unlock(&xprt->transport_lock);
- dprintk("RPC: %s: duplicate reply 0x%p to RPC "
- "request 0x%p: xid 0x%08x\n", __func__, rep, req,
- be32_to_cpu(headerp->rm_xid));
- goto repost;
- }
+ if (req->rl_reply)
+ goto out_duplicate;
dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
" RPC request 0x%p xid 0x%08x\n",
@@ -883,8 +858,45 @@ badheader:
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task);
+ xprt_complete_rqst(rqst->rq_task, status);
+ spin_unlock(&xprt->transport_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
__func__, xprt, rqst, status);
- xprt_complete_rqst(rqst->rq_task, status);
+ return;
+
+out_badstatus:
+ rpcrdma_recv_buffer_put(rep);
+ if (r_xprt->rx_ep.rep_connected == 1) {
+ r_xprt->rx_ep.rep_connected = -EIO;
+ rpcrdma_conn_func(&r_xprt->rx_ep);
+ }
+ return;
+
+out_shortreply:
+ dprintk("RPC: %s: short/invalid reply\n", __func__);
+ goto repost;
+
+out_badversion:
+ dprintk("RPC: %s: invalid version %d\n",
+ __func__, be32_to_cpu(headerp->rm_vers));
+ goto repost;
+
+out_nomatch:
+ spin_unlock(&xprt->transport_lock);
+ dprintk("RPC: %s: reply 0x%p failed "
+ "to match any request xid 0x%08x len %d\n",
+ __func__, rep, be32_to_cpu(headerp->rm_xid),
+ rep->rr_len);
+ goto repost;
+
+out_duplicate:
spin_unlock(&xprt->transport_lock);
+ dprintk("RPC: %s: duplicate reply 0x%p to RPC "
+ "request 0x%p: xid 0x%08x\n", __func__, rep, req,
+ be32_to_cpu(headerp->rm_xid));
+
+repost:
+ r_xprt->rx_stats.bad_reply_count++;
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ rpcrdma_recv_buffer_put(rep);
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index f2e3863..ac1345b 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -241,7 +241,7 @@ out_fail:
if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("RPC: %s: rep %p: %s\n",
__func__, rep, ib_wc_status_msg(wc->status));
- rep->rr_len = ~0U;
+ rep->rr_len = RPCRDMA_BAD_LEN;
goto out_schedule;
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 42c8d44..a13508b 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -168,6 +168,8 @@ struct rpcrdma_rep {
struct rpcrdma_regbuf *rr_rdmabuf;
};
+#define RPCRDMA_BAD_LEN (~0U)
+
/*
* struct rpcrdma_mw - external memory region metadata
*
The rb_send_bufs and rb_recv_bufs arrays are used to implement a
pair of stacks for keeping track of free rpcrdma_req and rpcrdma_rep
structs. Replace those arrays with free lists.
To allow more than 512 RPCs in-flight at once, each of these arrays
would be larger than a page (assuming 8-byte addresses and 4KB
pages). Allowing up to 64K in-flight RPCs (as TCP now does), each
buffer array would have to be 128 pages. That's an order-6
allocation. (Not that we're going there.)
A list is easier to expand dynamically. Instead of allocating a
larger array of pointers and copying the existing pointers to the
new array, simply append more buffers to each list.
This also makes it simpler to manage receive buffers that might
catch backwards-direction calls, or to post receive buffers in
bulk to amortize the overhead of ib_post_recv.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 141 +++++++++++++++++----------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 9 +-
2 files changed, 66 insertions(+), 84 deletions(-)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index ac1345b..8d99214 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -962,44 +962,18 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
- char *p;
- size_t len;
int i, rc;
- buf->rb_max_requests = cdata->max_requests;
+ buf->rb_max_requests = r_xprt->rx_data.max_requests;
spin_lock_init(&buf->rb_lock);
- /* Need to allocate:
- * 1. arrays for send and recv pointers
- * 2. arrays of struct rpcrdma_req to fill in pointers
- * 3. array of struct rpcrdma_rep for replies
- * Send/recv buffers in req/rep need to be registered
- */
- len = buf->rb_max_requests *
- (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
-
- p = kzalloc(len, GFP_KERNEL);
- if (p == NULL) {
- dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
- __func__, len);
- rc = -ENOMEM;
- goto out;
- }
- buf->rb_pool = p; /* for freeing it later */
-
- buf->rb_send_bufs = (struct rpcrdma_req **) p;
- p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
- buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
- p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
-
rc = ia->ri_ops->ro_init(r_xprt);
if (rc)
goto out;
+ INIT_LIST_HEAD(&buf->rb_send_bufs);
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;
- struct rpcrdma_rep *rep;
req = rpcrdma_create_req(r_xprt);
if (IS_ERR(req)) {
@@ -1008,7 +982,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
rc = PTR_ERR(req);
goto out;
}
- buf->rb_send_bufs[i] = req;
+ list_add(&req->rl_free, &buf->rb_send_bufs);
+ }
+
+ INIT_LIST_HEAD(&buf->rb_recv_bufs);
+ for (i = 0; i < buf->rb_max_requests + 2; i++) {
+ struct rpcrdma_rep *rep;
rep = rpcrdma_create_rep(r_xprt);
if (IS_ERR(rep)) {
@@ -1017,7 +996,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
rc = PTR_ERR(rep);
goto out;
}
- buf->rb_recv_bufs[i] = rep;
+ list_add(&rep->rr_list, &buf->rb_recv_bufs);
}
return 0;
@@ -1051,25 +1030,26 @@ void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_ia *ia = rdmab_to_ia(buf);
- int i;
- /* clean up in reverse order from create
- * 1. recv mr memory (mr free, then kfree)
- * 2. send mr memory (mr free, then kfree)
- * 3. MWs
- */
- dprintk("RPC: %s: entering\n", __func__);
+ while (!list_empty(&buf->rb_recv_bufs)) {
+ struct rpcrdma_rep *rep = list_entry(buf->rb_recv_bufs.next,
+ struct rpcrdma_rep,
+ rr_list);
- for (i = 0; i < buf->rb_max_requests; i++) {
- if (buf->rb_recv_bufs)
- rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
- if (buf->rb_send_bufs)
- rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
+ list_del(&rep->rr_list);
+ rpcrdma_destroy_rep(ia, rep);
}
- ia->ri_ops->ro_destroy(buf);
+ while (!list_empty(&buf->rb_send_bufs)) {
+ struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next,
+ struct rpcrdma_req,
+ rl_free);
- kfree(buf->rb_pool);
+ list_del(&req->rl_free);
+ rpcrdma_destroy_req(ia, req);
+ }
+
+ ia->ri_ops->ro_destroy(buf);
}
struct rpcrdma_mw *
@@ -1102,24 +1082,27 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
}
static void
-rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+rpcrdma_buffer_put_locked(struct rpcrdma_rep *rep, struct rpcrdma_buffer *buf)
{
- buf->rb_send_bufs[--buf->rb_send_index] = req;
- req->rl_niovs = 0;
- if (req->rl_reply) {
- buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
- req->rl_reply = NULL;
- }
+ list_add_tail(&rep->rr_list, &buf->rb_recv_bufs);
+}
+
+static struct rpcrdma_rep *
+rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_rep *rep;
+
+ rep = list_first_entry(&buf->rb_recv_bufs,
+ struct rpcrdma_rep, rr_list);
+ list_del(&rep->rr_list);
+
+ return rep;
}
/*
* Get a set of request/reply buffers.
*
- * Reply buffer (if needed) is attached to send buffer upon return.
- * Rule:
- * rb_send_index and rb_recv_index MUST always be pointing to the
- * *next* available buffer (non-NULL). They are incremented after
- * removing buffers, and decremented *before* returning them.
+ * Reply buffer (if available) is attached to send buffer upon return.
*/
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
@@ -1129,25 +1112,22 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
spin_lock_irqsave(&buffers->rb_lock, flags);
- if (buffers->rb_send_index == buffers->rb_max_requests) {
+ if (list_empty(&buffers->rb_send_bufs)) {
spin_unlock_irqrestore(&buffers->rb_lock, flags);
- dprintk("RPC: %s: out of request buffers\n", __func__);
- return ((struct rpcrdma_req *)NULL);
- }
-
- req = buffers->rb_send_bufs[buffers->rb_send_index];
- if (buffers->rb_send_index < buffers->rb_recv_index) {
- dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
- __func__,
- buffers->rb_recv_index - buffers->rb_send_index);
- req->rl_reply = NULL;
- } else {
- req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
- buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
+ pr_warn("RPC: %s: out of request buffers\n", __func__);
+ return NULL;
}
- buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
+ req = list_first_entry(&buffers->rb_send_bufs,
+ struct rpcrdma_req, rl_free);
+ list_del(&req->rl_free);
+ req->rl_reply = NULL;
+ if (!list_empty(&buffers->rb_recv_bufs))
+ req->rl_reply = rpcrdma_buffer_get_locked(buffers);
spin_unlock_irqrestore(&buffers->rb_lock, flags);
+
+ if (!req->rl_reply)
+ pr_warn("RPC: %s: out of reply buffers\n", __func__);
return req;
}
@@ -1159,17 +1139,22 @@ void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
+ struct rpcrdma_rep *rep = req->rl_reply;
unsigned long flags;
+ req->rl_niovs = 0;
+ req->rl_reply = NULL;
+
spin_lock_irqsave(&buffers->rb_lock, flags);
- rpcrdma_buffer_put_sendbuf(req, buffers);
+ list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
+ if (rep)
+ rpcrdma_buffer_put_locked(rep, buffers);
spin_unlock_irqrestore(&buffers->rb_lock, flags);
}
/*
* Recover reply buffers from pool.
- * This happens when recovering from error conditions.
- * Post-increment counter/array index.
+ * This happens when recovering from disconnect.
*/
void
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
@@ -1178,10 +1163,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
unsigned long flags;
spin_lock_irqsave(&buffers->rb_lock, flags);
- if (buffers->rb_recv_index < buffers->rb_max_requests) {
- req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
- buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
- }
+ if (!list_empty(&buffers->rb_recv_bufs))
+ req->rl_reply = rpcrdma_buffer_get_locked(buffers);
spin_unlock_irqrestore(&buffers->rb_lock, flags);
}
@@ -1196,7 +1179,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
unsigned long flags;
spin_lock_irqsave(&buffers->rb_lock, flags);
- buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
+ rpcrdma_buffer_put_locked(rep, buffers);
spin_unlock_irqrestore(&buffers->rb_lock, flags);
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index a13508b..e6a358f 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -252,6 +252,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
#define RPCRDMA_MAX_IOVS (2)
struct rpcrdma_req {
+ struct list_head rl_free;
unsigned int rl_niovs;
unsigned int rl_nchunks;
unsigned int rl_connect_cookie;
@@ -285,12 +286,10 @@ struct rpcrdma_buffer {
struct list_head rb_all;
char *rb_pool;
- spinlock_t rb_lock; /* protect buf arrays */
+ spinlock_t rb_lock; /* protect buf lists */
+ struct list_head rb_send_bufs;
+ struct list_head rb_recv_bufs;
u32 rb_max_requests;
- int rb_send_index;
- int rb_recv_index;
- struct rpcrdma_req **rb_send_bufs;
- struct rpcrdma_rep **rb_recv_bufs;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
xprt_{setup,destroy}_backchannel() won't be adequate for RPC/RMDA
bi-direction. In particular, receive buffers have to be pre-
registered and posted in order to receive incoming backchannel
requests.
Add a virtual function call to allow the insertion of appropriate
backchannel setup and destruction methods for each transport.
In addition, freeing a backchannel request is a little different
for RPC/RDMA. Introduce an rpc_xprt_op to handle the difference.
Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/bc_xprt.h | 5 +++++
include/linux/sunrpc/xprt.h | 3 +++
net/sunrpc/backchannel_rqst.c | 24 ++++++++++++++++++++++--
net/sunrpc/xprtsock.c | 15 +++++++++++++++
4 files changed, 45 insertions(+), 2 deletions(-)
diff --git a/include/linux/sunrpc/bc_xprt.h b/include/linux/sunrpc/bc_xprt.h
index 8df43c9f..4397a48 100644
--- a/include/linux/sunrpc/bc_xprt.h
+++ b/include/linux/sunrpc/bc_xprt.h
@@ -38,6 +38,11 @@ void xprt_free_bc_request(struct rpc_rqst *req);
int xprt_setup_backchannel(struct rpc_xprt *, unsigned int min_reqs);
void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs);
+/* Socket backchannel transport methods */
+int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs);
+void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs);
+void xprt_free_bc_rqst(struct rpc_rqst *req);
+
/*
* Determine if a shared backchannel is in use
*/
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 0fb9acb..81e3433 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -136,6 +136,9 @@ struct rpc_xprt_ops {
int (*enable_swap)(struct rpc_xprt *xprt);
void (*disable_swap)(struct rpc_xprt *xprt);
void (*inject_disconnect)(struct rpc_xprt *xprt);
+ int (*bc_setup)(struct rpc_xprt *xprt, unsigned int min_reqs);
+ void (*bc_free_rqst)(struct rpc_rqst *rqst);
+ void (*bc_destroy)(struct rpc_xprt *xprt, unsigned int max_reqs);
};
/*
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 6255d14..c823be6 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -138,6 +138,14 @@ out_free:
*/
int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
{
+ if (!xprt->ops->bc_setup)
+ return -ENOSYS;
+ return xprt->ops->bc_setup(xprt, min_reqs);
+}
+EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
+
+int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
+{
struct rpc_rqst *req;
struct list_head tmp_list;
int i;
@@ -192,7 +200,6 @@ out_free:
dprintk("RPC: setup backchannel transport failed\n");
return -ENOMEM;
}
-EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
/**
* xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
@@ -205,6 +212,13 @@ EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
*/
void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
{
+ if (xprt->ops->bc_destroy)
+ xprt->ops->bc_destroy(xprt, max_reqs);
+}
+EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
+
+void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
+{
struct rpc_rqst *req = NULL, *tmp = NULL;
dprintk("RPC: destroy backchannel transport\n");
@@ -227,7 +241,6 @@ out:
dprintk("RPC: backchannel list empty= %s\n",
list_empty(&xprt->bc_pa_list) ? "true" : "false");
}
-EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)
{
@@ -264,6 +277,13 @@ void xprt_free_bc_request(struct rpc_rqst *req)
{
struct rpc_xprt *xprt = req->rq_xprt;
+ xprt->ops->bc_free_rqst(req);
+}
+
+void xprt_free_bc_rqst(struct rpc_rqst *req)
+{
+ struct rpc_xprt *xprt = req->rq_xprt;
+
dprintk("RPC: free backchannel req=%p\n", req);
req->rq_connect_cookie = xprt->connect_cookie - 1;
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 7be90bc..d2ad732 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2532,6 +2532,11 @@ static struct rpc_xprt_ops xs_local_ops = {
.print_stats = xs_local_print_stats,
.enable_swap = xs_enable_swap,
.disable_swap = xs_disable_swap,
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
+ .bc_setup = xprt_setup_bc,
+ .bc_free_rqst = xprt_free_bc_rqst,
+ .bc_destroy = xprt_destroy_bc,
+#endif
};
static struct rpc_xprt_ops xs_udp_ops = {
@@ -2554,6 +2559,11 @@ static struct rpc_xprt_ops xs_udp_ops = {
.enable_swap = xs_enable_swap,
.disable_swap = xs_disable_swap,
.inject_disconnect = xs_inject_disconnect,
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
+ .bc_setup = xprt_setup_bc,
+ .bc_free_rqst = xprt_free_bc_rqst,
+ .bc_destroy = xprt_destroy_bc,
+#endif
};
static struct rpc_xprt_ops xs_tcp_ops = {
@@ -2573,6 +2583,11 @@ static struct rpc_xprt_ops xs_tcp_ops = {
.enable_swap = xs_enable_swap,
.disable_swap = xs_disable_swap,
.inject_disconnect = xs_inject_disconnect,
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
+ .bc_setup = xprt_setup_bc,
+ .bc_free_rqst = xprt_free_bc_rqst,
+ .bc_destroy = xprt_destroy_bc,
+#endif
};
/*
xprtrdma's backward direction send and receive buffers are the same
size as the forechannel's inline threshold, and must be pre-
registered.
The consumer has no control over which receive buffer the adapter
chooses to catch an incoming backwards-direction call. Any receive
buffer can be used for either a forward reply or a backward call.
Thus both types of RPC message must all be the same size.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/Makefile | 1
net/sunrpc/xprtrdma/backchannel.c | 204 +++++++++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/transport.c | 7 +
net/sunrpc/xprtrdma/verbs.c | 92 ++++++++++++++---
net/sunrpc/xprtrdma/xprt_rdma.h | 20 ++++
5 files changed, 309 insertions(+), 15 deletions(-)
create mode 100644 net/sunrpc/xprtrdma/backchannel.c
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 48913de..33f99d3 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
svc_rdma.o svc_rdma_transport.o \
svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
module.o
+rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
new file mode 100644
index 0000000..c0a42ad
--- /dev/null
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -0,0 +1,204 @@
+/*
+ * Copyright (c) 2015 Oracle. All rights reserved.
+ *
+ * Support for backward direction RPCs on RPC/RDMA.
+ */
+
+#include <linux/module.h>
+
+#include "xprt_rdma.h"
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
+ struct rpc_rqst *rqst)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+
+ spin_lock(&buf->rb_reqslock);
+ list_del(&req->rl_all);
+ spin_unlock(&buf->rb_reqslock);
+
+ rpcrdma_destroy_req(&r_xprt->rx_ia, req);
+
+ kfree(rqst);
+}
+
+static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
+ struct rpc_rqst *rqst)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_regbuf *rb;
+ struct rpcrdma_req *req;
+ struct xdr_buf *buf;
+ size_t size;
+
+ req = rpcrdma_create_req(r_xprt);
+ if (!req)
+ return -ENOMEM;
+ req->rl_backchannel = true;
+
+ size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+ rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+ if (IS_ERR(rb))
+ goto out_fail;
+ req->rl_rdmabuf = rb;
+
+ size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+ rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+ if (IS_ERR(rb))
+ goto out_fail;
+ rb->rg_owner = req;
+ req->rl_sendbuf = rb;
+ /* so that rpcr_to_rdmar works when receiving a request */
+ rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
+
+ buf = &rqst->rq_snd_buf;
+ buf->head[0].iov_base = rqst->rq_buffer;
+ buf->head[0].iov_len = 0;
+ buf->tail[0].iov_base = NULL;
+ buf->tail[0].iov_len = 0;
+ buf->page_len = 0;
+ buf->len = 0;
+ buf->buflen = size;
+
+ return 0;
+
+out_fail:
+ rpcrdma_bc_free_rqst(r_xprt, rqst);
+ return -ENOMEM;
+}
+
+/* Allocate and add receive buffers to the rpcrdma_buffer's existing
+ * list of rep's. These are released when the transport is destroyed. */
+static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
+ unsigned int count)
+{
+ struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+ struct rpcrdma_rep *rep;
+ unsigned long flags;
+ int rc = 0;
+
+ while (count--) {
+ rep = rpcrdma_create_rep(r_xprt);
+ if (IS_ERR(rep)) {
+ pr_err("RPC: %s: reply buffer alloc failed\n",
+ __func__);
+ rc = PTR_ERR(rep);
+ break;
+ }
+
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ list_add(&rep->rr_list, &buffers->rb_recv_bufs);
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ }
+
+ return rc;
+}
+
+/**
+ * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
+ * @xprt: transport associated with these backchannel resources
+ * @reqs: number of concurrent incoming requests to expect
+ *
+ * Returns 0 on success; otherwise a negative errno
+ */
+int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
+ struct rpc_rqst *rqst;
+ unsigned int i;
+ int rc;
+
+ /* The backchannel reply path returns each rpc_rqst to the
+ * bc_pa_list _after_ the reply is sent. If the server is
+ * faster than the client, it can send another backward
+ * direction request before the rpc_rqst is returned to the
+ * list. The client rejects the request in this case.
+ *
+ * Twice as many rpc_rqsts are prepared to ensure there is
+ * always an rpc_rqst available as soon as a reply is sent.
+ */
+ for (i = 0; i < (reqs << 1); i++) {
+ rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
+ if (!rqst) {
+ pr_err("RPC: %s: Failed to create bc rpc_rqst\n",
+ __func__);
+ goto out_free;
+ }
+
+ rqst->rq_xprt = &r_xprt->rx_xprt;
+ INIT_LIST_HEAD(&rqst->rq_list);
+ INIT_LIST_HEAD(&rqst->rq_bc_list);
+
+ if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
+ goto out_free;
+
+ spin_lock_bh(&xprt->bc_pa_lock);
+ list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+ spin_unlock_bh(&xprt->bc_pa_lock);
+ }
+
+ rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
+ if (rc)
+ goto out_free;
+
+ rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
+ if (rc)
+ goto out_free;
+
+ buffer->rb_bc_srv_max_requests = reqs;
+ request_module("svcrdma");
+
+ return 0;
+
+out_free:
+ xprt_rdma_bc_destroy(xprt, reqs);
+
+ pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
+ return -ENOMEM;
+}
+
+/**
+ * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
+ * @xprt: transport associated with these backchannel resources
+ * @reqs: number of incoming requests to destroy; ignored
+ */
+void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpc_rqst *rqst, *tmp;
+
+ spin_lock_bh(&xprt->bc_pa_lock);
+ list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
+ list_del(&rqst->rq_bc_pa_list);
+ spin_unlock_bh(&xprt->bc_pa_lock);
+
+ rpcrdma_bc_free_rqst(r_xprt, rqst);
+
+ spin_lock_bh(&xprt->bc_pa_lock);
+ }
+ spin_unlock_bh(&xprt->bc_pa_lock);
+}
+
+/**
+ * xprt_rdma_bc_free_rqst - Release a backchannel rqst
+ * @rqst: request to release
+ */
+void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
+{
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+
+ smp_mb__before_atomic();
+ WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
+ clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+ smp_mb__after_atomic();
+
+ spin_lock_bh(&xprt->bc_pa_lock);
+ list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+ spin_unlock_bh(&xprt->bc_pa_lock);
+}
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index e9e5ed7..e3871a6 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -705,7 +705,12 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
.print_stats = xprt_rdma_print_stats,
.enable_swap = xprt_rdma_enable_swap,
.disable_swap = xprt_rdma_disable_swap,
- .inject_disconnect = xprt_rdma_inject_disconnect
+ .inject_disconnect = xprt_rdma_inject_disconnect,
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ .bc_setup = xprt_rdma_bc_setup,
+ .bc_free_rqst = xprt_rdma_bc_free_rqst,
+ .bc_destroy = xprt_rdma_bc_destroy,
+#endif
};
static struct xprt_class xprt_rdma = {
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 8d99214..1e4a948 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -877,7 +877,22 @@ retry:
}
rc = ep->rep_connected;
} else {
+ struct rpcrdma_xprt *r_xprt;
+ unsigned int extras;
+
dprintk("RPC: %s: connected\n", __func__);
+
+ r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+ extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
+
+ if (extras) {
+ rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
+ if (rc)
+ pr_err("%s: could not post "
+ "extra receive buffers: %i\n",
+ __func__, rc);
+ rc = 0;
+ }
}
out:
@@ -914,20 +929,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
}
}
-static struct rpcrdma_req *
+struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
+ struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
struct rpcrdma_req *req;
req = kzalloc(sizeof(*req), GFP_KERNEL);
if (req == NULL)
return ERR_PTR(-ENOMEM);
+ INIT_LIST_HEAD(&req->rl_free);
+ spin_lock(&buffer->rb_reqslock);
+ list_add(&req->rl_all, &buffer->rb_allreqs);
+ spin_unlock(&buffer->rb_reqslock);
req->rl_buffer = &r_xprt->rx_buf;
return req;
}
-static struct rpcrdma_rep *
+struct rpcrdma_rep *
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
@@ -965,6 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
int i, rc;
buf->rb_max_requests = r_xprt->rx_data.max_requests;
+ buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_lock);
rc = ia->ri_ops->ro_init(r_xprt);
@@ -972,6 +993,8 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
goto out;
INIT_LIST_HEAD(&buf->rb_send_bufs);
+ INIT_LIST_HEAD(&buf->rb_allreqs);
+ spin_lock_init(&buf->rb_reqslock);
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;
@@ -982,6 +1005,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
rc = PTR_ERR(req);
goto out;
}
+ req->rl_backchannel = false;
list_add(&req->rl_free, &buf->rb_send_bufs);
}
@@ -1008,19 +1032,13 @@ out:
static void
rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
{
- if (!rep)
- return;
-
rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
kfree(rep);
}
-static void
+void
rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
- if (!req)
- return;
-
rpcrdma_free_regbuf(ia, req->rl_sendbuf);
rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
kfree(req);
@@ -1040,14 +1058,20 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
rpcrdma_destroy_rep(ia, rep);
}
- while (!list_empty(&buf->rb_send_bufs)) {
- struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next,
+ spin_lock(&buf->rb_reqslock);
+ while (!list_empty(&buf->rb_allreqs)) {
+ struct rpcrdma_req *req = list_entry(buf->rb_allreqs.next,
struct rpcrdma_req,
- rl_free);
+ rl_all);
+
+ list_del(&req->rl_all);
+ spin_unlock(&buf->rb_reqslock);
- list_del(&req->rl_free);
rpcrdma_destroy_req(ia, req);
+
+ spin_lock(&buf->rb_reqslock);
}
+ spin_unlock(&buf->rb_reqslock);
ia->ri_ops->ro_destroy(buf);
}
@@ -1094,7 +1118,7 @@ rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf)
rep = list_first_entry(&buf->rb_recv_bufs,
struct rpcrdma_rep, rr_list);
- list_del(&rep->rr_list);
+ list_del_init(&rep->rr_list);
return rep;
}
@@ -1337,6 +1361,46 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
return rc;
}
+/**
+ * rpcrdma_bc_post_recv - Post buffers to catch incoming backchannel requests
+ * @r_xprt: transport associated with these backchannel resources
+ * @min_reqs: minimum number of incoming requests expected
+ *
+ * Returns zero if all requested buffers were posted, or a negative errno.
+ */
+int
+rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+ struct rpcrdma_rep *rep;
+ unsigned long flags;
+ int rc;
+
+ while (count--) {
+ rep = NULL;
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ if (!list_empty(&buffers->rb_recv_bufs))
+ rep = rpcrdma_buffer_get_locked(buffers);
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ if (!rep) {
+ pr_err("%s: no extra receive buffers\n", __func__);
+ return -ENOMEM;
+ }
+
+ rc = rpcrdma_ep_post_recv(ia, ep, rep);
+ if (rc) {
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ rpcrdma_buffer_put_locked(rep, buffers);
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ return rc;
+ }
+ }
+
+ return 0;
+}
+
/* How many chunk list items fit within our inline buffers?
*/
unsigned int
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index e6a358f..2ca0567 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -262,6 +262,9 @@ struct rpcrdma_req {
struct rpcrdma_regbuf *rl_rdmabuf;
struct rpcrdma_regbuf *rl_sendbuf;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
+
+ struct list_head rl_all;
+ bool rl_backchannel;
};
static inline struct rpcrdma_req *
@@ -290,6 +293,10 @@ struct rpcrdma_buffer {
struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
u32 rb_max_requests;
+
+ u32 rb_bc_srv_max_requests;
+ spinlock_t rb_reqslock; /* protect rb_allreqs */
+ struct list_head rb_allreqs;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -410,6 +417,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
/*
* Buffer calls - xprtrdma/verbs.c
*/
+struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
+struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
+void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
@@ -426,6 +436,7 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
struct rpcrdma_regbuf *);
unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
+int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
int frwr_alloc_recovery_wq(void);
void frwr_destroy_recovery_wq(void);
@@ -490,6 +501,15 @@ int rpcrdma_marshal_req(struct rpc_rqst *);
int xprt_rdma_init(void);
void xprt_rdma_cleanup(void);
+/* Backchannel calls - xprtrdma/backchannel.c
+ */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
+int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
+void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
+void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
/* Temporary NFS request map cache. Created in svc_rdma.c */
extern struct kmem_cache *svc_rdma_map_cachep;
/* WR context cache. Created in svc_rdma.c */
Pre-allocate extra send and receive Work Requests needed to handle
backchannel receives and sends.
The transport doesn't know how many extra WRs to pre-allocate until
the xprt_setup_backchannel() call, but that's long after the WRs are
allocated during forechannel setup.
So, use a fixed value for now.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 4 ++++
net/sunrpc/xprtrdma/verbs.c | 14 ++++++++++++--
net/sunrpc/xprtrdma/xprt_rdma.h | 10 ++++++++++
3 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index c0a42ad..f5c7122 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -123,6 +123,9 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
* Twice as many rpc_rqsts are prepared to ensure there is
* always an rpc_rqst available as soon as a reply is sent.
*/
+ if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
+ goto out_err;
+
for (i = 0; i < (reqs << 1); i++) {
rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
if (!rqst) {
@@ -159,6 +162,7 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
out_free:
xprt_rdma_bc_destroy(xprt, reqs);
+out_err:
pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
return -ENOMEM;
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 1e4a948..133c720 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -614,6 +614,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
struct ib_device_attr *devattr = &ia->ri_devattr;
struct ib_cq *sendcq, *recvcq;
struct ib_cq_init_attr cq_attr = {};
+ unsigned int max_qp_wr;
int rc, err;
if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
@@ -622,18 +623,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
return -ENOMEM;
}
+ if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
+ dprintk("RPC: %s: insufficient wqe's available\n",
+ __func__);
+ return -ENOMEM;
+ }
+ max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
+
/* check provider's send/recv wr limits */
- if (cdata->max_requests > devattr->max_qp_wr)
- cdata->max_requests = devattr->max_qp_wr;
+ if (cdata->max_requests > max_qp_wr)
+ cdata->max_requests = max_qp_wr;
ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
ep->rep_attr.qp_context = ep;
ep->rep_attr.srq = NULL;
ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+ ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
rc = ia->ri_ops->ro_open(ia, ep, cdata);
if (rc)
return rc;
ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+ ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
ep->rep_attr.cap.max_recv_sge = 1;
ep->rep_attr.cap.max_inline_data = 0;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 2ca0567..37d0d7f 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -101,6 +101,16 @@ struct rpcrdma_ep {
*/
#define RPCRDMA_IGNORE_COMPLETION (0ULL)
+/* Pre-allocate extra Work Requests for handling backward receives
+ * and sends. This is a fixed value because the Work Queues are
+ * allocated when the forward channel is set up.
+ */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+#define RPCRDMA_BACKWARD_WRS (8)
+#else
+#define RPCRDMA_BACKWARD_WRS (0)
+#endif
+
/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
*
* The below structure appears at the front of a large region of kmalloc'd
Backward direction RPC replies are sent via the client transport's
send_request method, the same way forward direction RPC calls are
sent.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 45 +++++++++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/rpc_rdma.c | 5 ++++
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
3 files changed, 51 insertions(+)
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index f5c7122..cc9c762 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -168,6 +168,51 @@ out_err:
}
/**
+ * rpcrdma_bc_marshal_reply - Send backwards direction reply
+ * @rqst: buffer containing RPC reply data
+ *
+ * Returns zero on success.
+ */
+int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
+{
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+ struct rpcrdma_msg *headerp;
+ size_t rpclen;
+
+ headerp = rdmab_to_msg(req->rl_rdmabuf);
+ headerp->rm_xid = rqst->rq_xid;
+ headerp->rm_vers = rpcrdma_version;
+ headerp->rm_credit =
+ cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
+ headerp->rm_type = rdma_msg;
+ headerp->rm_body.rm_chunks[0] = xdr_zero;
+ headerp->rm_body.rm_chunks[1] = xdr_zero;
+ headerp->rm_body.rm_chunks[2] = xdr_zero;
+
+ rpclen = rqst->rq_svec[0].iov_len;
+
+ pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
+ __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
+ pr_info("RPC: %s: RPC/RDMA: %*ph\n",
+ __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
+ pr_info("RPC: %s: RPC: %*ph\n",
+ __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
+
+ req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
+ req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
+ req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
+
+ req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
+ req->rl_send_iov[1].length = rpclen;
+ req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
+
+ req->rl_niovs = 2;
+ return 0;
+}
+
+/**
* xprt_rdma_bc_destroy - Release resources for handling backchannel requests
* @xprt: transport associated with these backchannel resources
* @reqs: number of incoming requests to destroy; ignored
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 287c874..d0dbbf7 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -441,6 +441,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
enum rpcrdma_chunktype rtype, wtype;
struct rpcrdma_msg *headerp;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
+ return rpcrdma_bc_marshal_reply(rqst);
+#endif
+
/*
* rpclen gets amount of data in first buffer, which is the
* pre-registered buffer.
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 37d0d7f..a59ce18 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -516,6 +516,7 @@ void xprt_rdma_cleanup(void);
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
+int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
Introduce a code path in the rpcrdma_reply_handler() to catch
incoming backward direction RPC calls and route them to the ULP's
backchannel server.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 115 +++++++++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/rpc_rdma.c | 41 +++++++++++++
net/sunrpc/xprtrdma/xprt_rdma.h | 2 +
3 files changed, 158 insertions(+)
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index cc9c762..2eee18a 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -5,6 +5,8 @@
*/
#include <linux/module.h>
+#include <linux/sunrpc/xprt.h>
+#include <linux/sunrpc/svc.h>
#include "xprt_rdma.h"
@@ -12,6 +14,8 @@
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
+#define RPCRDMA_BACKCHANNEL_DEBUG
+
static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst *rqst)
{
@@ -251,3 +255,114 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
spin_unlock_bh(&xprt->bc_pa_lock);
}
+
+/**
+ * rpcrdma_bc_receive_call - Handle a backward direction call
+ * @xprt: transport receiving the call
+ * @rep: receive buffer containing the call
+ *
+ * Called in the RPC reply handler, which runs in a tasklet.
+ * Be quick about it.
+ *
+ * Operational assumptions:
+ * o Backchannel credits are ignored, just as the NFS server
+ * forechannel currently does
+ * o The ULP manages a replay cache (eg, NFSv4.1 sessions).
+ * No replay detection is done at the transport level
+ */
+void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_rep *rep)
+{
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ struct rpcrdma_msg *headerp;
+ struct svc_serv *bc_serv;
+ struct rpcrdma_req *req;
+ struct rpc_rqst *rqst;
+ struct xdr_buf *buf;
+ size_t size;
+ __be32 *p;
+
+ headerp = rdmab_to_msg(rep->rr_rdmabuf);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+ pr_info("RPC: %s: callback XID %08x, length=%u\n",
+ __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
+ pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp);
+#endif
+
+ /* Sanity check:
+ * Need at least enough bytes for RPC/RDMA header, as code
+ * here references the header fields by array offset. Also,
+ * backward calls are always inline, so ensure there
+ * are some bytes beyond the RPC/RDMA header.
+ */
+ if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
+ goto out_short;
+ p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
+ size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
+
+ /* Grab a free bc rqst */
+ spin_lock(&xprt->bc_pa_lock);
+ if (list_empty(&xprt->bc_pa_list)) {
+ spin_unlock(&xprt->bc_pa_lock);
+ goto out_overflow;
+ }
+ rqst = list_first_entry(&xprt->bc_pa_list,
+ struct rpc_rqst, rq_bc_pa_list);
+ list_del(&rqst->rq_bc_pa_list);
+ spin_unlock(&xprt->bc_pa_lock);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+ pr_info("RPC: %s: using rqst %p\n", __func__, rqst);
+#endif
+
+ /* Prepare rqst */
+ rqst->rq_reply_bytes_recvd = 0;
+ rqst->rq_bytes_sent = 0;
+ rqst->rq_xid = headerp->rm_xid;
+ set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+
+ buf = &rqst->rq_rcv_buf;
+ memset(buf, 0, sizeof(*buf));
+ buf->head[0].iov_base = p;
+ buf->head[0].iov_len = size;
+ buf->len = size;
+
+ /* The receive buffer has to be hooked to the rpcrdma_req so that
+ * it can be reposted after the server is done parsing it but just
+ * before sending the backward direction reply. */
+ req = rpcr_to_rdmar(rqst);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+ pr_info("RPC: %s: attaching rep %p to req %p\n",
+ __func__, rep, req);
+#endif
+ req->rl_reply = rep;
+
+ /* Defeat the retransmit detection logic in send_request */
+ req->rl_connect_cookie = 0;
+
+ /* Queue rqst for ULP's callback service */
+ bc_serv = xprt->bc_serv;
+ spin_lock(&bc_serv->sv_cb_lock);
+ list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
+ spin_unlock(&bc_serv->sv_cb_lock);
+
+ wake_up(&bc_serv->sv_cb_waitq);
+
+ r_xprt->rx_stats.bcall_count++;
+ return;
+
+out_overflow:
+ pr_warn("RPC/RDMA backchannel overflow\n");
+ xprt_disconnect_done(xprt);
+ /* This receive buffer gets reposted automatically
+ * when the connection is re-established. */
+ return;
+
+out_short:
+ pr_warn("RPC/RDMA short backward direction call\n");
+
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ xprt_disconnect_done(xprt);
+ else
+ pr_warn("RPC: %s: reposting rep %p\n",
+ __func__, rep);
+}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index d0dbbf7..3830250 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -716,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work)
spin_unlock_bh(&xprt->transport_lock);
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+/* By convention, backchannel calls arrive via rdma_msg type
+ * messages, and never populate the chunk lists. This makes
+ * the RPC/RDMA header small and fixed in size, so it is
+ * straightforward to check the RPC header's direction field.
+ */
+static bool
+rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
+{
+ __be32 *p = (__be32 *)headerp;
+
+ if (headerp->rm_type != rdma_msg)
+ return false;
+ if (headerp->rm_body.rm_chunks[0] != xdr_zero)
+ return false;
+ if (headerp->rm_body.rm_chunks[1] != xdr_zero)
+ return false;
+ if (headerp->rm_body.rm_chunks[2] != xdr_zero)
+ return false;
+
+ /* sanity */
+ if (p[7] != headerp->rm_xid)
+ return false;
+ /* call direction */
+ if (p[8] != cpu_to_be32(RPC_CALL))
+ return false;
+
+ return true;
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
/*
* This function is called when an async event is posted to
* the connection which changes the connection state. All it
@@ -756,6 +787,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
headerp = rdmab_to_msg(rep->rr_rdmabuf);
if (headerp->rm_vers != rpcrdma_version)
goto out_badversion;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ if (rpcrdma_is_bcall(headerp))
+ goto out_bcall;
+#endif
/* Get XID and try for a match. */
spin_lock(&xprt->transport_lock);
@@ -877,6 +912,12 @@ out_badstatus:
}
return;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+out_bcall:
+ rpcrdma_bc_receive_call(r_xprt, rep);
+ return;
+#endif
+
out_shortreply:
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index a59ce18..3e513e7c 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -352,6 +352,7 @@ struct rpcrdma_stats {
unsigned long failed_marshal_count;
unsigned long bad_reply_count;
unsigned long nomsg_call_count;
+ unsigned long bcall_count;
};
/*
@@ -516,6 +517,7 @@ void xprt_rdma_cleanup(void);
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
+void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
On NFSv4.1 mount points, the Linux NFS client uses this transport
endpoint to receive backward direction calls and route replies back
to the NFSv4.1 server.
Signed-off-by: Chuck Lever <[email protected]>
Acked-by: "J. Bruce Fields" <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 6 +++
include/linux/sunrpc/xprt.h | 1 +
net/sunrpc/xprtrdma/svc_rdma.c | 6 +++
net/sunrpc/xprtrdma/svc_rdma_transport.c | 58 ++++++++++++++++++++++++++++++
4 files changed, 70 insertions(+), 1 deletion(-)
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 7ccc961..fb4013e 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -228,9 +228,13 @@ extern void svc_rdma_put_frmr(struct svcxprt_rdma *,
struct svc_rdma_fastreg_mr *);
extern void svc_sq_reap(struct svcxprt_rdma *);
extern void svc_rq_reap(struct svcxprt_rdma *);
-extern struct svc_xprt_class svc_rdma_class;
extern void svc_rdma_prep_reply_hdr(struct svc_rqst *);
+extern struct svc_xprt_class svc_rdma_class;
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
+extern struct svc_xprt_class svc_rdma_bc_class;
+#endif
+
/* svc_rdma.c */
extern int svc_rdma_init(void);
extern void svc_rdma_cleanup(void);
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 81e3433..025198d 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -156,6 +156,7 @@ enum xprt_transports {
XPRT_TRANSPORT_TCP = IPPROTO_TCP,
XPRT_TRANSPORT_BC_TCP = IPPROTO_TCP | XPRT_TRANSPORT_BC,
XPRT_TRANSPORT_RDMA = 256,
+ XPRT_TRANSPORT_BC_RDMA = XPRT_TRANSPORT_RDMA | XPRT_TRANSPORT_BC,
XPRT_TRANSPORT_LOCAL = 257,
};
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index 2cd252f..1b7051b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -239,6 +239,9 @@ void svc_rdma_cleanup(void)
unregister_sysctl_table(svcrdma_table_header);
svcrdma_table_header = NULL;
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ svc_unreg_xprt_class(&svc_rdma_bc_class);
+#endif
svc_unreg_xprt_class(&svc_rdma_class);
kmem_cache_destroy(svc_rdma_map_cachep);
kmem_cache_destroy(svc_rdma_ctxt_cachep);
@@ -286,6 +289,9 @@ int svc_rdma_init(void)
/* Register RDMA with the SVC transport switch */
svc_reg_xprt_class(&svc_rdma_class);
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ svc_reg_xprt_class(&svc_rdma_bc_class);
+#endif
return 0;
err1:
kmem_cache_destroy(svc_rdma_map_cachep);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index fcc3eb8..a133b1e 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -56,6 +56,7 @@
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
struct net *net,
struct sockaddr *sa, int salen,
@@ -95,6 +96,63 @@ struct svc_xprt_class svc_rdma_class = {
.xcl_ident = XPRT_TRANSPORT_RDMA,
};
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
+ struct sockaddr *, int, int);
+static void svc_rdma_bc_detach(struct svc_xprt *);
+static void svc_rdma_bc_free(struct svc_xprt *);
+
+static struct svc_xprt_ops svc_rdma_bc_ops = {
+ .xpo_create = svc_rdma_bc_create,
+ .xpo_detach = svc_rdma_bc_detach,
+ .xpo_free = svc_rdma_bc_free,
+ .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
+ .xpo_secure_port = svc_rdma_secure_port,
+};
+
+struct svc_xprt_class svc_rdma_bc_class = {
+ .xcl_name = "rdma-bc",
+ .xcl_owner = THIS_MODULE,
+ .xcl_ops = &svc_rdma_bc_ops,
+ .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
+};
+
+static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
+ struct net *net,
+ struct sockaddr *sa, int salen,
+ int flags)
+{
+ struct svcxprt_rdma *cma_xprt;
+ struct svc_xprt *xprt;
+
+ cma_xprt = rdma_create_xprt(serv, 0);
+ if (!cma_xprt)
+ return ERR_PTR(-ENOMEM);
+ xprt = &cma_xprt->sc_xprt;
+
+ svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
+ serv->sv_bc_xprt = xprt;
+
+ dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+ return xprt;
+}
+
+static void svc_rdma_bc_detach(struct svc_xprt *xprt)
+{
+ dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+}
+
+static void svc_rdma_bc_free(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
+ dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+ if (xprt)
+ kfree(rdma);
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
{
struct svc_rdma_op_ctxt *ctxt;
Allow the use of other transport classes when handling a backward
direction RPC call.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/svc.c | 5 -----
1 file changed, 5 deletions(-)
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index a8f579d..bc5b7b5 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1367,11 +1367,6 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
/* reset result send buffer "put" position */
resv->iov_len = 0;
- if (rqstp->rq_prot != IPPROTO_TCP) {
- printk(KERN_ERR "No support for Non-TCP transports!\n");
- BUG();
- }
-
/*
* Skip the next two words because they've already been
* processed in the transport
Pass the correct backchannel transport class to svc_create_xprt()
when setting up an NFSv4.1 backchannel transport.
Signed-off-by: Chuck Lever <[email protected]>
---
fs/nfs/callback.c | 33 +++++++++++++++++++++------------
include/linux/sunrpc/xprt.h | 1 +
net/sunrpc/xprtrdma/transport.c | 1 +
net/sunrpc/xprtsock.c | 1 +
4 files changed, 24 insertions(+), 12 deletions(-)
diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c
index 75f7c0a..46ed2c5 100644
--- a/fs/nfs/callback.c
+++ b/fs/nfs/callback.c
@@ -99,15 +99,22 @@ nfs4_callback_up(struct svc_serv *serv)
}
#if defined(CONFIG_NFS_V4_1)
-static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net)
+/*
+ * Create an svc_sock for the back channel service that shares the
+ * fore channel connection.
+ * Returns the input port (0) and sets the svc_serv bc_xprt on success
+ */
+static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net,
+ struct rpc_xprt *xprt)
{
- /*
- * Create an svc_sock for the back channel service that shares the
- * fore channel connection.
- * Returns the input port (0) and sets the svc_serv bc_xprt on success
- */
- return svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
- SVC_SOCK_ANONYMOUS);
+ int ret = -EPROTONOSUPPORT;
+
+ if (xprt->bc_name)
+ ret = svc_create_xprt(serv, xprt->bc_name, net, PF_INET, 0,
+ SVC_SOCK_ANONYMOUS);
+ dprintk("NFS: svc_create_xprt(%s) returned %d\n",
+ xprt->bc_name, ret);
+ return ret;
}
/*
@@ -184,7 +191,8 @@ static inline void nfs_callback_bc_serv(u32 minorversion, struct rpc_xprt *xprt,
xprt->bc_serv = serv;
}
#else
-static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net)
+static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net,
+ struct rpc_xprt *xprt)
{
return 0;
}
@@ -259,7 +267,8 @@ static void nfs_callback_down_net(u32 minorversion, struct svc_serv *serv, struc
svc_shutdown_net(serv, net);
}
-static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct net *net)
+static int nfs_callback_up_net(int minorversion, struct svc_serv *serv,
+ struct net *net, struct rpc_xprt *xprt)
{
struct nfs_net *nn = net_generic(net, nfs_net_id);
int ret;
@@ -281,7 +290,7 @@ static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct n
break;
case 1:
case 2:
- ret = nfs41_callback_up_net(serv, net);
+ ret = nfs41_callback_up_net(serv, net, xprt);
break;
default:
printk(KERN_ERR "NFS: unknown callback version: %d\n",
@@ -364,7 +373,7 @@ int nfs_callback_up(u32 minorversion, struct rpc_xprt *xprt)
goto err_create;
}
- ret = nfs_callback_up_net(minorversion, serv, net);
+ ret = nfs_callback_up_net(minorversion, serv, net, xprt);
if (ret < 0)
goto err_net;
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 025198d..6156491 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -168,6 +168,7 @@ struct rpc_xprt {
struct sockaddr_storage addr; /* server address */
size_t addrlen; /* size of server address */
int prot; /* IP protocol */
+ char *bc_name; /* backchannel transport */
unsigned long cong; /* current congestion */
unsigned long cwnd; /* congestion window */
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index e3871a6..7d6c06f 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -337,6 +337,7 @@ xprt_setup_rdma(struct xprt_create *args)
/* Ensure xprt->addr holds valid server TCP (not RDMA)
* address, for any side protocols which peek at it */
xprt->prot = IPPROTO_TCP;
+ xprt->bc_name = "rdma-bc";
xprt->addrlen = args->addrlen;
memcpy(&xprt->addr, sap, xprt->addrlen);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index d2ad732..3ff123d 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2851,6 +2851,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
transport = container_of(xprt, struct sock_xprt, xprt);
xprt->prot = IPPROTO_TCP;
+ xprt->bc_name = "tcp-bc";
xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
To support backward direction calls, I'm going to add an
svc_rdma_get_context() call in the client RDMA transport.
Called from ->buf_alloc(), we can't sleep waiting for memory.
So add an API that can get a server op_ctxt but won't sleep.
Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 2 ++
net/sunrpc/xprtrdma/svc_rdma_transport.c | 28 +++++++++++++++++++++++-----
2 files changed, 25 insertions(+), 5 deletions(-)
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 6ce7495..2500dd1 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -224,6 +224,8 @@ extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
extern int svc_rdma_post_recv(struct svcxprt_rdma *);
extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
+extern struct svc_rdma_op_ctxt *svc_rdma_get_context_gfp(struct svcxprt_rdma *,
+ gfp_t);
extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
extern struct svc_rdma_req_map *svc_rdma_get_req_map(void);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 23aba30..c4083a3 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -153,17 +153,35 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt)
}
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
-struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
+static void svc_rdma_init_context(struct svcxprt_rdma *xprt,
+ struct svc_rdma_op_ctxt *ctxt)
{
- struct svc_rdma_op_ctxt *ctxt;
-
- ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep,
- GFP_KERNEL | __GFP_NOFAIL);
ctxt->xprt = xprt;
INIT_LIST_HEAD(&ctxt->dto_q);
ctxt->count = 0;
ctxt->frmr = NULL;
atomic_inc(&xprt->sc_ctxt_used);
+}
+
+struct svc_rdma_op_ctxt *svc_rdma_get_context_gfp(struct svcxprt_rdma *xprt,
+ gfp_t flags)
+{
+ struct svc_rdma_op_ctxt *ctxt;
+
+ ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep, flags);
+ if (!ctxt)
+ return NULL;
+ svc_rdma_init_context(xprt, ctxt);
+ return ctxt;
+}
+
+struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
+{
+ struct svc_rdma_op_ctxt *ctxt;
+
+ ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep,
+ GFP_KERNEL | __GFP_NOFAIL);
+ svc_rdma_init_context(xprt, ctxt);
return ctxt;
}
To support the NFSv4.1 backchannel on RDMA connections, add a
mechanism for sending a backwards-direction RPC/RDMA call on a
connection established by a client.
Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 2 +
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 63 +++++++++++++++++++++++++++++++++
2 files changed, 65 insertions(+)
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 2500dd1..42262dd 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -216,6 +216,8 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,
extern int svc_rdma_sendto(struct svc_rqst *);
extern struct rpcrdma_read_chunk *
svc_rdma_get_read_chunk(struct rpcrdma_msg *);
+extern int svc_rdma_bc_post_send(struct svcxprt_rdma *,
+ struct svc_rdma_op_ctxt *, struct xdr_buf *);
/* svc_rdma_transport.c */
extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 1dfae83..0bda3a5 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -641,3 +641,66 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
svc_rdma_put_context(ctxt, 0);
return ret;
}
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+/* Send a backwards direction RPC call.
+ *
+ * Caller holds the connection's mutex and has already marshaled the
+ * RPC/RDMA request. Before sending the request, this API also posts
+ * an extra receive buffer to catch the bc reply for this request.
+ */
+int svc_rdma_bc_post_send(struct svcxprt_rdma *rdma,
+ struct svc_rdma_op_ctxt *ctxt, struct xdr_buf *sndbuf)
+{
+ struct svc_rdma_req_map *vec;
+ struct ib_send_wr send_wr;
+ int ret;
+
+ vec = svc_rdma_get_req_map();
+ ret = map_xdr(rdma, sndbuf, vec);
+ if (ret)
+ goto out;
+
+ /* Post a recv buffer to handle reply for this request */
+ ret = svc_rdma_post_recv(rdma);
+ if (ret) {
+ pr_err("svcrdma: Failed to post bc receive buffer, err=%d. "
+ "Closing transport %p.\n", ret, rdma);
+ set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+ ret = -ENOTCONN;
+ goto out;
+ }
+
+ ctxt->wr_op = IB_WR_SEND;
+ ctxt->direction = DMA_TO_DEVICE;
+ ctxt->sge[0].lkey = rdma->sc_dma_lkey;
+ ctxt->sge[0].length = sndbuf->len;
+ ctxt->sge[0].addr =
+ ib_dma_map_page(rdma->sc_cm_id->device, ctxt->pages[0], 0,
+ sndbuf->len, DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) {
+ svc_rdma_unmap_dma(ctxt);
+ ret = -EIO;
+ goto out;
+ }
+ atomic_inc(&rdma->sc_dma_used);
+
+ memset(&send_wr, 0, sizeof send_wr);
+ send_wr.wr_id = (unsigned long)ctxt;
+ send_wr.sg_list = ctxt->sge;
+ send_wr.num_sge = 1;
+ send_wr.opcode = IB_WR_SEND;
+ send_wr.send_flags = IB_SEND_SIGNALED;
+
+ ret = svc_rdma_send(rdma, &send_wr);
+ if (ret) {
+ svc_rdma_unmap_dma(ctxt);
+ ret = -EIO;
+ goto out;
+ }
+out:
+ svc_rdma_put_req_map(vec);
+ pr_info("svcrdma: %s returns %d\n", __func__, ret);
+ return ret;
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
Extra resources for handling backchannel requests have to be
pre-allocated when a transport instance is created. Set a limit.
Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 5 +++++
net/sunrpc/xprtrdma/svc_rdma_transport.c | 6 +++++-
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index fb4013e..6ce7495 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -180,6 +180,11 @@ struct svcxprt_rdma {
#define RPCRDMA_SQ_DEPTH_MULT 8
#define RPCRDMA_MAX_REQUESTS 32
#define RPCRDMA_MAX_REQ_SIZE 4096
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+#define RPCRDMA_MAX_BC_REQUESTS 8
+#else
+#define RPCRDMA_MAX_BC_REQUESTS 0
+#endif
#define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index a133b1e..23aba30 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -935,8 +935,10 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
(size_t)RPCSVC_MAXPAGES);
newxprt->sc_max_sge_rd = min_t(size_t, devattr.max_sge_rd,
RPCSVC_MAXPAGES);
+ /* XXX: what if HCA can't support enough WRs for bc operation? */
newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
- (size_t)svcrdma_max_requests);
+ (size_t)(svcrdma_max_requests +
+ RPCRDMA_MAX_BC_REQUESTS));
newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
/*
@@ -976,7 +978,9 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
qp_attr.event_handler = qp_event_handler;
qp_attr.qp_context = &newxprt->sc_xprt;
qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
+ qp_attr.cap.max_send_wr += RPCRDMA_MAX_BC_REQUESTS;
qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
+ qp_attr.cap.max_recv_wr += RPCRDMA_MAX_BC_REQUESTS;
qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
To support the NFSv4.1 backchannel on RDMA connections, add a
capability for receiving an RPC/RDMA reply on a connection
established by a client.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 76 +++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 60 ++++++++++++++++++++++++
net/sunrpc/xprtrdma/xprt_rdma.h | 4 ++
3 files changed, 140 insertions(+)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 3830250..b728f6f 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -946,3 +946,79 @@ repost:
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
rpcrdma_recv_buffer_put(rep);
}
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+
+int
+rpcrdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
+ struct xdr_buf *rcvbuf)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct kvec *dst, *src = &rcvbuf->head[0];
+ struct rpc_rqst *req;
+ unsigned long cwnd;
+ u32 credits;
+ size_t len;
+ __be32 xid;
+ __be32 *p;
+ int ret;
+
+ p = (__be32 *)src->iov_base;
+ len = src->iov_len;
+ xid = rmsgp->rm_xid;
+
+ pr_info("%s: xid=%08x, length=%zu\n",
+ __func__, be32_to_cpu(xid), len);
+ pr_info("%s: RPC/RDMA: %*ph\n",
+ __func__, (int)RPCRDMA_HDRLEN_MIN, rmsgp);
+ pr_info("%s: RPC: %*ph\n",
+ __func__, (int)len, p);
+
+ ret = -EAGAIN;
+ if (src->iov_len < 24)
+ goto out_shortreply;
+
+ spin_lock_bh(&xprt->transport_lock);
+ req = xprt_lookup_rqst(xprt, xid);
+ if (!req)
+ goto out_notfound;
+
+ dst = &req->rq_private_buf.head[0];
+ memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf));
+ if (dst->iov_len < len)
+ goto out_unlock;
+ memcpy(dst->iov_base, p, len);
+
+ credits = be32_to_cpu(rmsgp->rm_credit);
+ if (credits == 0)
+ credits = 1; /* don't deadlock */
+ else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
+ credits = r_xprt->rx_buf.rb_bc_max_requests;
+
+ cwnd = xprt->cwnd;
+ xprt->cwnd = credits << RPC_CWNDSHIFT;
+ if (xprt->cwnd > cwnd)
+ xprt_release_rqst_cong(req->rq_task);
+
+ ret = 0;
+ xprt_complete_rqst(req->rq_task, rcvbuf->len);
+ rcvbuf->len = 0;
+
+out_unlock:
+ spin_unlock_bh(&xprt->transport_lock);
+out:
+ return ret;
+
+out_shortreply:
+ pr_info("svcrdma: short bc reply: xprt=%p, len=%zu\n",
+ xprt, src->iov_len);
+ goto out;
+
+out_notfound:
+ pr_info("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
+ xprt, be32_to_cpu(xid));
+
+ goto out_unlock;
+}
+
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 5f6ca47..be75abba 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -47,6 +47,7 @@
#include <rdma/ib_verbs.h>
#include <rdma/rdma_cm.h>
#include <linux/sunrpc/svc_rdma.h>
+#include "xprt_rdma.h"
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
@@ -560,6 +561,42 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
return ret;
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+
+/* By convention, backchannel calls arrive via rdma_msg type
+ * messages, and never populate the chunk lists. This makes
+ * the RPC/RDMA header small and fixed in size, so it is
+ * straightforward to check the RPC header's direction field.
+ */
+static bool
+svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, struct rpcrdma_msg *rmsgp)
+{
+ __be32 *p = (__be32 *)rmsgp;
+
+ if (!xprt->xpt_bc_xprt)
+ return false;
+
+ if (rmsgp->rm_type != rdma_msg)
+ return false;
+ if (rmsgp->rm_body.rm_chunks[0] != xdr_zero)
+ return false;
+ if (rmsgp->rm_body.rm_chunks[1] != xdr_zero)
+ return false;
+ if (rmsgp->rm_body.rm_chunks[2] != xdr_zero)
+ return false;
+
+ /* sanity */
+ if (p[7] != rmsgp->rm_xid)
+ return false;
+ /* call direction */
+ if (p[8] == cpu_to_be32(RPC_CALL))
+ return false;
+
+ return true;
+}
+
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
/*
* Set up the rqstp thread context to point to the RQ buffer. If
* necessary, pull additional data from the client with an RDMA_READ
@@ -625,6 +662,17 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
goto close_out;
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
+ ret = rpcrdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp,
+ &rqstp->rq_arg);
+ svc_rdma_put_context(ctxt, 0);
+ if (ret)
+ goto repost;
+ return ret;
+ }
+#endif
+
/* Read read-list data. */
ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
if (ret > 0) {
@@ -661,4 +709,16 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
set_bit(XPT_CLOSE, &xprt->xpt_flags);
defer:
return 0;
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+repost:
+ ret = svc_rdma_post_recv(rdma_xprt);
+ if (ret) {
+ pr_info("svcrdma: could not post a receive buffer, err=%d."
+ "Closing transport %p.\n", ret, rdma_xprt);
+ set_bit(XPT_CLOSE, &rdma_xprt->sc_xprt.xpt_flags);
+ ret = -ENOTCONN;
+ }
+ return ret;
+#endif
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 3e513e7c..45a8b70 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -307,6 +307,8 @@ struct rpcrdma_buffer {
u32 rb_bc_srv_max_requests;
spinlock_t rb_reqslock; /* protect rb_allreqs */
struct list_head rb_allreqs;
+
+ u32 rb_bc_max_requests;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -506,6 +508,8 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
*/
int rpcrdma_marshal_req(struct rpc_rqst *);
+int rpcrdma_handle_bc_reply(struct rpc_xprt *, struct rpcrdma_msg *,
+ struct xdr_buf *);
/* RPC/RDMA module init - xprtrdma/transport.c
*/
To support the server-side of an NFSv4.1 backchannel on RDMA
connections, add a transport class for backwards direction
operation.
Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/xprt.h | 1
net/sunrpc/xprt.c | 1
net/sunrpc/xprtrdma/svc_rdma_transport.c | 14 +-
net/sunrpc/xprtrdma/transport.c | 243 ++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/xprt_rdma.h | 2
5 files changed, 256 insertions(+), 5 deletions(-)
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 6156491..4f1b0b6 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -83,6 +83,7 @@ struct rpc_rqst {
__u32 * rq_buffer; /* XDR encode buffer */
size_t rq_callsize,
rq_rcvsize;
+ void * rq_privdata; /* xprt-specific per-rqst data */
size_t rq_xmit_bytes_sent; /* total bytes sent */
size_t rq_reply_bytes_recvd; /* total reply bytes */
/* received */
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index ab5dd62..9480354 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1419,3 +1419,4 @@ void xprt_put(struct rpc_xprt *xprt)
if (atomic_dec_and_test(&xprt->count))
xprt_destroy(xprt);
}
+EXPORT_SYMBOL_GPL(xprt_put);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index c4083a3..6bd4c1e 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -1194,12 +1194,14 @@ static void __svc_rdma_free(struct work_struct *work)
{
struct svcxprt_rdma *rdma =
container_of(work, struct svcxprt_rdma, sc_work);
- dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+ struct svc_xprt *xprt = &rdma->sc_xprt;
+
+ dprintk("svcrdma: %s(%p)\n", __func__, rdma);
/* We should only be called from kref_put */
- if (atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0)
+ if (atomic_read(&xprt->xpt_ref.refcount) != 0)
pr_err("svcrdma: sc_xprt still in use? (%d)\n",
- atomic_read(&rdma->sc_xprt.xpt_ref.refcount));
+ atomic_read(&xprt->xpt_ref.refcount));
/*
* Destroy queued, but not processed read completions. Note
@@ -1234,6 +1236,12 @@ static void __svc_rdma_free(struct work_struct *work)
pr_err("svcrdma: dma still in use? (%d)\n",
atomic_read(&rdma->sc_dma_used));
+ /* Final put of backchannel client transport */
+ if (xprt->xpt_bc_xprt) {
+ xprt_put(xprt->xpt_bc_xprt);
+ xprt->xpt_bc_xprt = NULL;
+ }
+
/* De-allocate fastreg mr */
rdma_dealloc_frmr_q(rdma);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 7d6c06f..1030425 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -51,6 +51,7 @@
#include <linux/slab.h>
#include <linux/seq_file.h>
#include <linux/sunrpc/addr.h>
+#include <linux/sunrpc/svc_rdma.h>
#include "xprt_rdma.h"
@@ -148,7 +149,10 @@ static struct ctl_table sunrpc_table[] = {
#define RPCRDMA_MAX_REEST_TO (30U * HZ)
#define RPCRDMA_IDLE_DISC_TO (5U * 60 * HZ)
-static struct rpc_xprt_ops xprt_rdma_procs; /* forward reference */
+static struct rpc_xprt_ops xprt_rdma_procs;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static struct rpc_xprt_ops xprt_rdma_bc_procs;
+#endif
static void
xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
@@ -500,7 +504,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
if (req == NULL)
return NULL;
- flags = GFP_NOIO | __GFP_NOWARN;
+ flags = RPCRDMA_DEF_GFP;
if (RPC_IS_SWAPPER(task))
flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
@@ -685,6 +689,199 @@ xprt_rdma_disable_swap(struct rpc_xprt *xprt)
{
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+
+/* Server-side transport endpoint wants a whole page for its send
+ * buffer. The client RPC code constructs the RPC header in this
+ * buffer before it invokes ->send_request.
+ */
+static void *
+xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
+{
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ struct svc_rdma_op_ctxt *ctxt;
+ struct svcxprt_rdma *rdma;
+ struct svc_xprt *sxprt;
+ struct page *page;
+
+ if (size > PAGE_SIZE) {
+ WARN_ONCE(1, "failed to handle buffer allocation (size %zu)\n",
+ size);
+ return NULL;
+ }
+
+ page = alloc_page(RPCRDMA_DEF_GFP);
+ if (!page)
+ return NULL;
+
+ sxprt = rqst->rq_xprt->bc_xprt;
+ rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
+ ctxt = svc_rdma_get_context_gfp(rdma, RPCRDMA_DEF_GFP);
+ if (!ctxt) {
+ put_page(page);
+ return NULL;
+ }
+
+ rqst->rq_privdata = ctxt;
+ ctxt->pages[0] = page;
+ ctxt->count = 1;
+ return page_address(page);
+}
+
+static void
+xprt_rdma_bc_free(void *buffer)
+{
+ /* No-op: ctxt and page have already been freed. */
+}
+
+static int
+rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
+{
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)rqst->rq_buffer;
+ struct svc_rdma_op_ctxt *ctxt;
+ int rc;
+
+ /* Space in the send buffer for an RPC/RDMA header is reserved
+ * via xprt->tsh_size */
+ headerp->rm_xid = rqst->rq_xid;
+ headerp->rm_vers = rpcrdma_version;
+ headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
+ headerp->rm_type = rdma_msg;
+ headerp->rm_body.rm_chunks[0] = xdr_zero;
+ headerp->rm_body.rm_chunks[1] = xdr_zero;
+ headerp->rm_body.rm_chunks[2] = xdr_zero;
+
+ pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
+
+ ctxt = (struct svc_rdma_op_ctxt *)rqst->rq_privdata;
+ rc = svc_rdma_bc_post_send(rdma, ctxt, &rqst->rq_snd_buf);
+ if (rc)
+ goto drop_connection;
+ return rc;
+
+drop_connection:
+ pr_info("Failed to send backwards request\n");
+ svc_rdma_put_context(ctxt, 1);
+ xprt_disconnect_done(xprt);
+ return -ENOTCONN;
+}
+
+/* Take an RPC request and sent it on the passive end of a
+ * transport connection.
+ */
+static int
+xprt_rdma_bc_send_request(struct rpc_task *task)
+{
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
+ struct svcxprt_rdma *rdma;
+ u32 len;
+
+ pr_info("%s: sending request with xid: %08x\n",
+ __func__, be32_to_cpu(rqst->rq_xid));
+
+ if (!mutex_trylock(&sxprt->xpt_mutex)) {
+ rpc_sleep_on(&sxprt->xpt_bc_pending, task, NULL);
+ if (!mutex_trylock(&sxprt->xpt_mutex))
+ return -EAGAIN;
+ rpc_wake_up_queued_task(&sxprt->xpt_bc_pending, task);
+ }
+
+ len = -ENOTCONN;
+ rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
+ if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
+ len = rpcrdma_bc_send_request(rdma, rqst);
+
+ mutex_unlock(&sxprt->xpt_mutex);
+
+ if (len < 0)
+ return len;
+ return 0;
+}
+
+static void
+xprt_rdma_bc_close(struct rpc_xprt *xprt)
+{
+ pr_info("RPC: %s: xprt %p\n", __func__, xprt);
+}
+
+static void
+xprt_rdma_bc_put(struct rpc_xprt *xprt)
+{
+ pr_info("RPC: %s: xprt %p\n", __func__, xprt);
+
+ xprt_free(xprt);
+ module_put(THIS_MODULE);
+}
+
+/* It shouldn't matter if the number of backchannel session slots
+ * doesn't match the number of RPC/RDMA credits. That just means
+ * one or the other will have extra slots that aren't used.
+ */
+static struct rpc_xprt *
+xprt_setup_rdma_bc(struct xprt_create *args)
+{
+ struct rpc_xprt *xprt;
+ struct rpcrdma_xprt *new_xprt;
+
+ if (args->addrlen > sizeof(xprt->addr)) {
+ dprintk("RPC: %s: address too large\n", __func__);
+ return ERR_PTR(-EBADF);
+ }
+
+ xprt = xprt_alloc(args->net, sizeof(*new_xprt),
+ RPCRDMA_MAX_BC_REQUESTS,
+ RPCRDMA_MAX_BC_REQUESTS);
+ if (xprt == NULL) {
+ dprintk("RPC: %s: couldn't allocate rpc_xprt\n",
+ __func__);
+ return ERR_PTR(-ENOMEM);
+ }
+
+ xprt->timeout = &xprt_rdma_default_timeout;
+ xprt_set_bound(xprt);
+ xprt_set_connected(xprt);
+ xprt->bind_timeout = RPCRDMA_BIND_TO;
+ xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
+ xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
+
+ xprt->prot = XPRT_TRANSPORT_BC_RDMA;
+ xprt->tsh_size = RPCRDMA_HDRLEN_MIN / sizeof(__be32);
+ xprt->ops = &xprt_rdma_bc_procs;
+
+ memcpy(&xprt->addr, args->dstaddr, args->addrlen);
+ xprt->addrlen = args->addrlen;
+ xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr);
+ xprt->resvport = 0;
+
+ xprt->max_payload = xprt_rdma_max_inline_read;
+
+ new_xprt = rpcx_to_rdmax(xprt);
+ new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs;
+
+ xprt_get(xprt);
+ args->bc_xprt->xpt_bc_xprt = xprt;
+ xprt->bc_xprt = args->bc_xprt;
+
+ if (!try_module_get(THIS_MODULE))
+ goto out_fail;
+
+ /* Final put for backchannel xprt is in __svc_rdma_free */
+ xprt_get(xprt);
+ return xprt;
+
+out_fail:
+ xprt_rdma_free_addresses(xprt);
+ args->bc_xprt->xpt_bc_xprt = NULL;
+ xprt_put(xprt);
+ xprt_free(xprt);
+ return ERR_PTR(-EINVAL);
+}
+
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
/*
* Plumbing for rpc transport switch and kernel module
*/
@@ -722,6 +919,32 @@ static struct xprt_class xprt_rdma = {
.setup = xprt_setup_rdma,
};
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+
+static struct rpc_xprt_ops xprt_rdma_bc_procs = {
+ .reserve_xprt = xprt_reserve_xprt_cong,
+ .release_xprt = xprt_release_xprt_cong,
+ .alloc_slot = xprt_alloc_slot,
+ .release_request = xprt_release_rqst_cong,
+ .buf_alloc = xprt_rdma_bc_allocate,
+ .buf_free = xprt_rdma_bc_free,
+ .send_request = xprt_rdma_bc_send_request,
+ .set_retrans_timeout = xprt_set_retrans_timeout_def,
+ .close = xprt_rdma_bc_close,
+ .destroy = xprt_rdma_bc_put,
+ .print_stats = xprt_rdma_print_stats
+};
+
+static struct xprt_class xprt_rdma_bc = {
+ .list = LIST_HEAD_INIT(xprt_rdma_bc.list),
+ .name = "rdma backchannel",
+ .owner = THIS_MODULE,
+ .ident = XPRT_TRANSPORT_BC_RDMA,
+ .setup = xprt_setup_rdma_bc,
+};
+
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
void xprt_rdma_cleanup(void)
{
int rc;
@@ -739,6 +962,13 @@ void xprt_rdma_cleanup(void)
__func__, rc);
frwr_destroy_recovery_wq();
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ rc = xprt_unregister_transport(&xprt_rdma_bc);
+ if (rc)
+ dprintk("RPC: %s: xprt_unregister(bc) returned %i\n",
+ __func__, rc);
+#endif
}
int xprt_rdma_init(void)
@@ -755,6 +985,15 @@ int xprt_rdma_init(void)
return rc;
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ rc = xprt_register_transport(&xprt_rdma_bc);
+ if (rc) {
+ xprt_unregister_transport(&xprt_rdma);
+ frwr_destroy_recovery_wq();
+ return rc;
+ }
+#endif
+
dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
dprintk("Defaults:\n");
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 45a8b70..b4855de 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -148,6 +148,8 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
return (struct rpcrdma_msg *)rb->rg_base;
}
+#define RPCRDMA_DEF_GFP (GFP_NOIO | __GFP_NOWARN)
+
/*
* struct rpcrdma_rep -- this structure encapsulates state required to recv
* and complete a reply, asychronously. It needs several pieces of
On Fri, Sep 18, 2015 at 2:14 AM, Chuck Lever <[email protected]> wrote:
>
> Commit 8301a2c047cc ("xprtrdma: Limit work done by completion
> handler") was supposed to prevent xprtrdma's upcall handlers from
> starving other softIRQ work by letting them return to the provider
> before all CQEs have been polled.
>
> The logic assumes the provider will call the upcall handler again
> immediately if the CQ is re-armed while there are still queued CQEs.
>
> This assumption is invalid. The IBTA spec says that after a CQ is
> armed, the hardware must interrupt only when a new CQE is inserted.
> xprtrdma can't rely on the provider calling again, even though some
> providers do.
>
> Therefore, leaving CQEs on queue makes sense only when there is
> another mechanism that ensures all remaining CQEs are consumed in a
> timely fashion. xprtrdma does not have such a mechanism. If a CQE
> remains queued, the transport can wait forever to send the next RPC.
>
> Finally, move the wcs array back onto the stack to ensure that the
> poll array is always local to the CPU where the completion upcall is
> running.
>
> Fixes: 8301a2c047cc ("xprtrdma: Limit work done by completion ...")
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/verbs.c | 100 ++++++++++++++++++---------------------
> net/sunrpc/xprtrdma/xprt_rdma.h | 5 --
> 2 files changed, 45 insertions(+), 60 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index 8a477e2..f2e3863 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -158,34 +158,37 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
> }
> }
>
> -static int
> +/* The wc array is on stack: automatic memory is always CPU-local.
> + *
> + * The common case is a single completion is ready. By asking
> + * for two entries, a return code of 1 means there is exactly
> + * one completion and no more. We don't have to poll again to
> + * know that the CQ is now empty.
> + */
> +static void
> rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
> {
> - struct ib_wc *wcs;
> - int budget, count, rc;
> + struct ib_wc *pos, wcs[2];
> + int count, rc;
>
> - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
> do {
> - wcs = ep->rep_send_wcs;
> + pos = wcs;
>
> - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
> - if (rc <= 0)
> - return rc;
> + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
> + if (rc < 0)
> + goto out_warn;
>
> count = rc;
> while (count-- > 0)
> - rpcrdma_sendcq_process_wc(wcs++);
> - } while (rc == RPCRDMA_POLLSIZE && --budget);
> - return 0;
> + rpcrdma_sendcq_process_wc(pos++);
> + } while (rc == ARRAY_SIZE(wcs));
I think I have missed something and not able to understand the reason
for polling 2 CQEs in one poll? It is possible that in a given poll_cq
call you end up getting on 1 completion, the other completion is
delayed due to some reason. Would it be better to poll for 1 in every
poll call Or
otherwise have this
while ( rc <= ARRAY_SIZE(wcs) && rc);
> + return;
> +
> +out_warn:
> + pr_warn("RPC: %s: ib_poll_cq() failed %i\n", __func__, rc);
> }
>
> -/*
> - * Handle send, fast_reg_mr, and local_inv completions.
> - *
> - * Send events are typically suppressed and thus do not result
> - * in an upcall. Occasionally one is signaled, however. This
> - * prevents the provider's completion queue from wrapping and
> - * losing a completion.
> +/* Handle provider send completion upcalls.
> */
> static void
> rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
> @@ -193,12 +196,7 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
> struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
> int rc;
>
> - rc = rpcrdma_sendcq_poll(cq, ep);
> - if (rc) {
> - dprintk("RPC: %s: ib_poll_cq failed: %i\n",
> - __func__, rc);
> - return;
> - }
> + rpcrdma_sendcq_poll(cq, ep);
>
> rc = ib_req_notify_cq(cq,
> IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
> @@ -247,44 +245,41 @@ out_fail:
> goto out_schedule;
> }
>
> -static int
> +/* The wc array is on stack: automatic memory is always CPU-local.
> + *
> + * struct ib_wc is 64 bytes, making the poll array potentially
> + * large. But this is at the bottom of the call chain. Further
> + * substantial work is done in another thread.
> + */
> +static void
> rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
> {
> - struct list_head sched_list;
> - struct ib_wc *wcs;
> - int budget, count, rc;
> + struct ib_wc *pos, wcs[4];
> + LIST_HEAD(sched_list);
> + int count, rc;
>
> - INIT_LIST_HEAD(&sched_list);
> - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
> do {
> - wcs = ep->rep_recv_wcs;
> + pos = wcs;
>
> - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
> - if (rc <= 0)
> - goto out_schedule;
> + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
> + if (rc < 0)
> + goto out_warn;
>
> count = rc;
> while (count-- > 0)
> - rpcrdma_recvcq_process_wc(wcs++, &sched_list);
> - } while (rc == RPCRDMA_POLLSIZE && --budget);
> - rc = 0;
> + rpcrdma_recvcq_process_wc(pos++, &sched_list);
> + } while (rc == ARRAY_SIZE(wcs));
>
> out_schedule:
> rpcrdma_schedule_tasklet(&sched_list);
> - return rc;
> + return;
> +
> +out_warn:
> + pr_warn("RPC: %s: ib_poll_cq() failed %i\n", __func__, rc);
> + goto out_schedule;
> }
>
> -/*
> - * Handle receive completions.
> - *
> - * It is reentrant but processes single events in order to maintain
> - * ordering of receives to keep server credits.
> - *
> - * It is the responsibility of the scheduled tasklet to return
> - * recv buffers to the pool. NOTE: this affects synchronization of
> - * connection shutdown. That is, the structures required for
> - * the completion of the reply handler must remain intact until
> - * all memory has been reclaimed.
> +/* Handle provider receive completion upcalls.
> */
> static void
> rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
> @@ -292,12 +287,7 @@ rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
> struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
> int rc;
>
> - rc = rpcrdma_recvcq_poll(cq, ep);
> - if (rc) {
> - dprintk("RPC: %s: ib_poll_cq failed: %i\n",
> - __func__, rc);
> - return;
> - }
> + rpcrdma_recvcq_poll(cq, ep);
>
> rc = ib_req_notify_cq(cq,
> IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index c09414e..42c8d44 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -77,9 +77,6 @@ struct rpcrdma_ia {
> * RDMA Endpoint -- one per transport instance
> */
>
> -#define RPCRDMA_WC_BUDGET (128)
> -#define RPCRDMA_POLLSIZE (16)
> -
> struct rpcrdma_ep {
> atomic_t rep_cqcount;
> int rep_cqinit;
> @@ -89,8 +86,6 @@ struct rpcrdma_ep {
> struct rdma_conn_param rep_remote_cma;
> struct sockaddr_storage rep_remote_addr;
> struct delayed_work rep_connect_worker;
> - struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
> - struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
> };
>
> /*
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
Hi Devesh-
On Sep 18, 2015, at 2:52 AM, Devesh Sharma <[email protected]> wrote:
> On Fri, Sep 18, 2015 at 2:14 AM, Chuck Lever <[email protected]> wrote:
>>
>> Commit 8301a2c047cc ("xprtrdma: Limit work done by completion
>> handler") was supposed to prevent xprtrdma's upcall handlers from
>> starving other softIRQ work by letting them return to the provider
>> before all CQEs have been polled.
>>
>> The logic assumes the provider will call the upcall handler again
>> immediately if the CQ is re-armed while there are still queued CQEs.
>>
>> This assumption is invalid. The IBTA spec says that after a CQ is
>> armed, the hardware must interrupt only when a new CQE is inserted.
>> xprtrdma can't rely on the provider calling again, even though some
>> providers do.
>>
>> Therefore, leaving CQEs on queue makes sense only when there is
>> another mechanism that ensures all remaining CQEs are consumed in a
>> timely fashion. xprtrdma does not have such a mechanism. If a CQE
>> remains queued, the transport can wait forever to send the next RPC.
>>
>> Finally, move the wcs array back onto the stack to ensure that the
>> poll array is always local to the CPU where the completion upcall is
>> running.
>>
>> Fixes: 8301a2c047cc ("xprtrdma: Limit work done by completion ...")
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> net/sunrpc/xprtrdma/verbs.c | 100 ++++++++++++++++++---------------------
>> net/sunrpc/xprtrdma/xprt_rdma.h | 5 --
>> 2 files changed, 45 insertions(+), 60 deletions(-)
>>
>> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
>> index 8a477e2..f2e3863 100644
>> --- a/net/sunrpc/xprtrdma/verbs.c
>> +++ b/net/sunrpc/xprtrdma/verbs.c
>> @@ -158,34 +158,37 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
>> }
>> }
>>
>> -static int
>> +/* The wc array is on stack: automatic memory is always CPU-local.
>> + *
>> + * The common case is a single completion is ready. By asking
>> + * for two entries, a return code of 1 means there is exactly
>> + * one completion and no more. We don't have to poll again to
>> + * know that the CQ is now empty.
>> + */
>> +static void
>> rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
>> {
>> - struct ib_wc *wcs;
>> - int budget, count, rc;
>> + struct ib_wc *pos, wcs[2];
>> + int count, rc;
>>
>> - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
>> do {
>> - wcs = ep->rep_send_wcs;
>> + pos = wcs;
>>
>> - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
>> - if (rc <= 0)
>> - return rc;
>> + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
>> + if (rc < 0)
>> + goto out_warn;
>>
>> count = rc;
>> while (count-- > 0)
>> - rpcrdma_sendcq_process_wc(wcs++);
>> - } while (rc == RPCRDMA_POLLSIZE && --budget);
>> - return 0;
>> + rpcrdma_sendcq_process_wc(pos++);
>> + } while (rc == ARRAY_SIZE(wcs));
>
> I think I have missed something and not able to understand the reason
> for polling 2 CQEs in one poll?
See the block comment above.
When ib_poll_cq() returns the same number of WCs as the
consumer requested, there may still be CQEs waiting to
be polled. Another call to ib_poll_cq() is needed to find
out if that's the case.
When ib_poll_cq() returns fewer WCs than the consumer
requested, the consumer doesn't have to call again to
know that the CQ is empty.
The common case, by far, is that one CQE is ready. By
requesting two, the number returned is less than the
number requested, and the consumer can tell immediately
that the CQE is drained. The extra ib_poll_cq call is
avoided.
Note that the existing logic also relies on polling
multiple WCs at once, and stopping the loop when the
number of returned WCs is less than the size of the
array.
> It is possible that in a given poll_cq
> call you end up getting on 1 completion, the other completion is
> delayed due to some reason.
If a CQE is allowed to be delayed, how does polling
again guarantee that the consumer can retrieve it?
What happens if a signal occurs, there is only one CQE,
but it is delayed? ib_poll_cq would return 0 in that
case, and the consumer would never call again, thinking
the CQ is empty. There's no way the consumer can know
for sure when a CQ is drained.
If the delayed CQE happens only when there is more
than one CQE, how can polling multiple WCs ever work
reliably?
Maybe I don't understand what is meant by delayed.
> Would it be better to poll for 1 in every
> poll call Or
> otherwise have this
> while ( rc <= ARRAY_SIZE(wcs) && rc);
>
>> + return;
>> +
>> +out_warn:
>> + pr_warn("RPC: %s: ib_poll_cq() failed %i\n", __func__, rc);
>> }
>>
>> -/*
>> - * Handle send, fast_reg_mr, and local_inv completions.
>> - *
>> - * Send events are typically suppressed and thus do not result
>> - * in an upcall. Occasionally one is signaled, however. This
>> - * prevents the provider's completion queue from wrapping and
>> - * losing a completion.
>> +/* Handle provider send completion upcalls.
>> */
>> static void
>> rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
>> @@ -193,12 +196,7 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
>> struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
>> int rc;
>>
>> - rc = rpcrdma_sendcq_poll(cq, ep);
>> - if (rc) {
>> - dprintk("RPC: %s: ib_poll_cq failed: %i\n",
>> - __func__, rc);
>> - return;
>> - }
>> + rpcrdma_sendcq_poll(cq, ep);
>>
>> rc = ib_req_notify_cq(cq,
>> IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
>> @@ -247,44 +245,41 @@ out_fail:
>> goto out_schedule;
>> }
>>
>> -static int
>> +/* The wc array is on stack: automatic memory is always CPU-local.
>> + *
>> + * struct ib_wc is 64 bytes, making the poll array potentially
>> + * large. But this is at the bottom of the call chain. Further
>> + * substantial work is done in another thread.
>> + */
>> +static void
>> rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
>> {
>> - struct list_head sched_list;
>> - struct ib_wc *wcs;
>> - int budget, count, rc;
>> + struct ib_wc *pos, wcs[4];
>> + LIST_HEAD(sched_list);
>> + int count, rc;
>>
>> - INIT_LIST_HEAD(&sched_list);
>> - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
>> do {
>> - wcs = ep->rep_recv_wcs;
>> + pos = wcs;
>>
>> - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
>> - if (rc <= 0)
>> - goto out_schedule;
>> + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
>> + if (rc < 0)
>> + goto out_warn;
>>
>> count = rc;
>> while (count-- > 0)
>> - rpcrdma_recvcq_process_wc(wcs++, &sched_list);
>> - } while (rc == RPCRDMA_POLLSIZE && --budget);
>> - rc = 0;
>> + rpcrdma_recvcq_process_wc(pos++, &sched_list);
>> + } while (rc == ARRAY_SIZE(wcs));
>>
>> out_schedule:
>> rpcrdma_schedule_tasklet(&sched_list);
>> - return rc;
>> + return;
>> +
>> +out_warn:
>> + pr_warn("RPC: %s: ib_poll_cq() failed %i\n", __func__, rc);
>> + goto out_schedule;
>> }
>>
>> -/*
>> - * Handle receive completions.
>> - *
>> - * It is reentrant but processes single events in order to maintain
>> - * ordering of receives to keep server credits.
>> - *
>> - * It is the responsibility of the scheduled tasklet to return
>> - * recv buffers to the pool. NOTE: this affects synchronization of
>> - * connection shutdown. That is, the structures required for
>> - * the completion of the reply handler must remain intact until
>> - * all memory has been reclaimed.
>> +/* Handle provider receive completion upcalls.
>> */
>> static void
>> rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
>> @@ -292,12 +287,7 @@ rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
>> struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
>> int rc;
>>
>> - rc = rpcrdma_recvcq_poll(cq, ep);
>> - if (rc) {
>> - dprintk("RPC: %s: ib_poll_cq failed: %i\n",
>> - __func__, rc);
>> - return;
>> - }
>> + rpcrdma_recvcq_poll(cq, ep);
>>
>> rc = ib_req_notify_cq(cq,
>> IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
>> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
>> index c09414e..42c8d44 100644
>> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
>> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
>> @@ -77,9 +77,6 @@ struct rpcrdma_ia {
>> * RDMA Endpoint -- one per transport instance
>> */
>>
>> -#define RPCRDMA_WC_BUDGET (128)
>> -#define RPCRDMA_POLLSIZE (16)
>> -
>> struct rpcrdma_ep {
>> atomic_t rep_cqcount;
>> int rep_cqinit;
>> @@ -89,8 +86,6 @@ struct rpcrdma_ep {
>> struct rdma_conn_param rep_remote_cma;
>> struct sockaddr_storage rep_remote_addr;
>> struct delayed_work rep_connect_worker;
>> - struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
>> - struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
>> };
>>
>> /*
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
--
Chuck Lever
>> It is possible that in a given poll_cq
>> call you end up getting on 1 completion, the other completion is
>> delayed due to some reason.
>
> If a CQE is allowed to be delayed, how does polling
> again guarantee that the consumer can retrieve it?
>
> What happens if a signal occurs, there is only one CQE,
> but it is delayed? ib_poll_cq would return 0 in that
> case, and the consumer would never call again, thinking
> the CQ is empty. There's no way the consumer can know
> for sure when a CQ is drained.
>
> If the delayed CQE happens only when there is more
> than one CQE, how can polling multiple WCs ever work
> reliably?
>
> Maybe I don't understand what is meant by delayed.
>
If I'm not mistaken, Devesh meant that if between ib_poll_cq (where you
polled the last 2 wcs) until the while statement another CQE was
generated then you lost a bit of efficiency. Correct?
>
>> Would it be better to poll for 1 in every
>> poll call Or
>> otherwise have this
>> while ( rc <= ARRAY_SIZE(wcs) && rc);
>>
On 9/17/2015 11:44 PM, Chuck Lever wrote:
> The rb_send_bufs and rb_recv_bufs arrays are used to implement a
> pair of stacks for keeping track of free rpcrdma_req and rpcrdma_rep
> structs. Replace those arrays with free lists.
>
> To allow more than 512 RPCs in-flight at once, each of these arrays
> would be larger than a page (assuming 8-byte addresses and 4KB
> pages). Allowing up to 64K in-flight RPCs (as TCP now does), each
> buffer array would have to be 128 pages. That's an order-6
> allocation. (Not that we're going there.)
>
> A list is easier to expand dynamically. Instead of allocating a
> larger array of pointers and copying the existing pointers to the
> new array, simply append more buffers to each list.
>
> This also makes it simpler to manage receive buffers that might
> catch backwards-direction calls, or to post receive buffers in
> bulk to amortize the overhead of ib_post_recv.
>
> Signed-off-by: Chuck Lever <[email protected]>
Hi Chuck,
I get the idea of this patch, but it is a bit confusing (to a
non-educated reader).
Can you explain why sometimes you call put/get_locked routines
and sometimes you open-code them? And is it mandatory to have
the callers lock before calling get/put? Perhaps the code would
be simpler if the get/put routines would take care of locking
since rb_lock looks dedicated to them.
> ---
> net/sunrpc/xprtrdma/verbs.c | 141 +++++++++++++++++----------------------
> net/sunrpc/xprtrdma/xprt_rdma.h | 9 +-
> 2 files changed, 66 insertions(+), 84 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index ac1345b..8d99214 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -962,44 +962,18 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
> {
> struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
> struct rpcrdma_ia *ia = &r_xprt->rx_ia;
> - struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
> - char *p;
> - size_t len;
> int i, rc;
>
> - buf->rb_max_requests = cdata->max_requests;
> + buf->rb_max_requests = r_xprt->rx_data.max_requests;
> spin_lock_init(&buf->rb_lock);
>
> - /* Need to allocate:
> - * 1. arrays for send and recv pointers
> - * 2. arrays of struct rpcrdma_req to fill in pointers
> - * 3. array of struct rpcrdma_rep for replies
> - * Send/recv buffers in req/rep need to be registered
> - */
> - len = buf->rb_max_requests *
> - (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
> -
> - p = kzalloc(len, GFP_KERNEL);
> - if (p == NULL) {
> - dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
> - __func__, len);
> - rc = -ENOMEM;
> - goto out;
> - }
> - buf->rb_pool = p; /* for freeing it later */
> -
> - buf->rb_send_bufs = (struct rpcrdma_req **) p;
> - p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
> - buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
> - p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
> -
> rc = ia->ri_ops->ro_init(r_xprt);
> if (rc)
> goto out;
>
> + INIT_LIST_HEAD(&buf->rb_send_bufs);
> for (i = 0; i < buf->rb_max_requests; i++) {
> struct rpcrdma_req *req;
> - struct rpcrdma_rep *rep;
>
> req = rpcrdma_create_req(r_xprt);
> if (IS_ERR(req)) {
> @@ -1008,7 +982,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
> rc = PTR_ERR(req);
> goto out;
> }
> - buf->rb_send_bufs[i] = req;
> + list_add(&req->rl_free, &buf->rb_send_bufs);
> + }
> +
> + INIT_LIST_HEAD(&buf->rb_recv_bufs);
> + for (i = 0; i < buf->rb_max_requests + 2; i++) {
> + struct rpcrdma_rep *rep;
>
> rep = rpcrdma_create_rep(r_xprt);
> if (IS_ERR(rep)) {
> @@ -1017,7 +996,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
> rc = PTR_ERR(rep);
> goto out;
> }
> - buf->rb_recv_bufs[i] = rep;
> + list_add(&rep->rr_list, &buf->rb_recv_bufs);
> }
>
> return 0;
> @@ -1051,25 +1030,26 @@ void
> rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
> {
> struct rpcrdma_ia *ia = rdmab_to_ia(buf);
> - int i;
>
> - /* clean up in reverse order from create
> - * 1. recv mr memory (mr free, then kfree)
> - * 2. send mr memory (mr free, then kfree)
> - * 3. MWs
> - */
> - dprintk("RPC: %s: entering\n", __func__);
> + while (!list_empty(&buf->rb_recv_bufs)) {
> + struct rpcrdma_rep *rep = list_entry(buf->rb_recv_bufs.next,
> + struct rpcrdma_rep,
> + rr_list);
>
> - for (i = 0; i < buf->rb_max_requests; i++) {
> - if (buf->rb_recv_bufs)
> - rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
> - if (buf->rb_send_bufs)
> - rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
> + list_del(&rep->rr_list);
> + rpcrdma_destroy_rep(ia, rep);
> }
>
> - ia->ri_ops->ro_destroy(buf);
> + while (!list_empty(&buf->rb_send_bufs)) {
> + struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next,
> + struct rpcrdma_req,
> + rl_free);
>
> - kfree(buf->rb_pool);
> + list_del(&req->rl_free);
> + rpcrdma_destroy_req(ia, req);
> + }
> +
> + ia->ri_ops->ro_destroy(buf);
> }
>
> struct rpcrdma_mw *
> @@ -1102,24 +1082,27 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
> }
>
> static void
> -rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
> +rpcrdma_buffer_put_locked(struct rpcrdma_rep *rep, struct rpcrdma_buffer *buf)
> {
> - buf->rb_send_bufs[--buf->rb_send_index] = req;
> - req->rl_niovs = 0;
> - if (req->rl_reply) {
> - buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
> - req->rl_reply = NULL;
> - }
> + list_add_tail(&rep->rr_list, &buf->rb_recv_bufs);
> +}
> +
> +static struct rpcrdma_rep *
> +rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf)
> +{
> + struct rpcrdma_rep *rep;
> +
> + rep = list_first_entry(&buf->rb_recv_bufs,
> + struct rpcrdma_rep, rr_list);
> + list_del(&rep->rr_list);
> +
> + return rep;
> }
There seems to be a distinction between send/recv buffers. Would it
make sense to have a symmetric handling for both send/recv buffers?
>
> /*
> * Get a set of request/reply buffers.
> *
> - * Reply buffer (if needed) is attached to send buffer upon return.
> - * Rule:
> - * rb_send_index and rb_recv_index MUST always be pointing to the
> - * *next* available buffer (non-NULL). They are incremented after
> - * removing buffers, and decremented *before* returning them.
> + * Reply buffer (if available) is attached to send buffer upon return.
> */
> struct rpcrdma_req *
> rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
> @@ -1129,25 +1112,22 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
>
> spin_lock_irqsave(&buffers->rb_lock, flags);
>
> - if (buffers->rb_send_index == buffers->rb_max_requests) {
> + if (list_empty(&buffers->rb_send_bufs)) {
> spin_unlock_irqrestore(&buffers->rb_lock, flags);
> - dprintk("RPC: %s: out of request buffers\n", __func__);
> - return ((struct rpcrdma_req *)NULL);
> - }
> -
> - req = buffers->rb_send_bufs[buffers->rb_send_index];
> - if (buffers->rb_send_index < buffers->rb_recv_index) {
> - dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
> - __func__,
> - buffers->rb_recv_index - buffers->rb_send_index);
> - req->rl_reply = NULL;
> - } else {
> - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
> - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
> + pr_warn("RPC: %s: out of request buffers\n", __func__);
> + return NULL;
> }
> - buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
> + req = list_first_entry(&buffers->rb_send_bufs,
> + struct rpcrdma_req, rl_free);
> + list_del(&req->rl_free);
>
> + req->rl_reply = NULL;
> + if (!list_empty(&buffers->rb_recv_bufs))
> + req->rl_reply = rpcrdma_buffer_get_locked(buffers);
Would it make sense to check !list_empty() inside _get_locked and handle
a possible NULL return?
> spin_unlock_irqrestore(&buffers->rb_lock, flags);
> +
> + if (!req->rl_reply)
> + pr_warn("RPC: %s: out of reply buffers\n", __func__);
> return req;
> }
>
> @@ -1159,17 +1139,22 @@ void
> rpcrdma_buffer_put(struct rpcrdma_req *req)
> {
> struct rpcrdma_buffer *buffers = req->rl_buffer;
> + struct rpcrdma_rep *rep = req->rl_reply;
> unsigned long flags;
>
> + req->rl_niovs = 0;
> + req->rl_reply = NULL;
> +
> spin_lock_irqsave(&buffers->rb_lock, flags);
> - rpcrdma_buffer_put_sendbuf(req, buffers);
> + list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
> + if (rep)
> + rpcrdma_buffer_put_locked(rep, buffers);
> spin_unlock_irqrestore(&buffers->rb_lock, flags);
> }
>
> /*
> * Recover reply buffers from pool.
> - * This happens when recovering from error conditions.
> - * Post-increment counter/array index.
> + * This happens when recovering from disconnect.
> */
> void
> rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
> @@ -1178,10 +1163,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
> unsigned long flags;
>
> spin_lock_irqsave(&buffers->rb_lock, flags);
> - if (buffers->rb_recv_index < buffers->rb_max_requests) {
> - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
> - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
> - }
> + if (!list_empty(&buffers->rb_recv_bufs))
> + req->rl_reply = rpcrdma_buffer_get_locked(buffers);
> spin_unlock_irqrestore(&buffers->rb_lock, flags);
> }
>
> @@ -1196,7 +1179,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
> unsigned long flags;
>
> spin_lock_irqsave(&buffers->rb_lock, flags);
> - buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
> + rpcrdma_buffer_put_locked(rep, buffers);
> spin_unlock_irqrestore(&buffers->rb_lock, flags);
> }
>
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index a13508b..e6a358f 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -252,6 +252,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
> #define RPCRDMA_MAX_IOVS (2)
>
> struct rpcrdma_req {
> + struct list_head rl_free;
> unsigned int rl_niovs;
> unsigned int rl_nchunks;
> unsigned int rl_connect_cookie;
> @@ -285,12 +286,10 @@ struct rpcrdma_buffer {
> struct list_head rb_all;
> char *rb_pool;
>
> - spinlock_t rb_lock; /* protect buf arrays */
> + spinlock_t rb_lock; /* protect buf lists */
> + struct list_head rb_send_bufs;
> + struct list_head rb_recv_bufs;
> u32 rb_max_requests;
> - int rb_send_index;
> - int rb_recv_index;
> - struct rpcrdma_req **rb_send_bufs;
> - struct rpcrdma_rep **rb_recv_bufs;
> };
> #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
On 9/17/2015 11:46 PM, Chuck Lever wrote:
> To support backward direction calls, I'm going to add an
> svc_rdma_get_context() call in the client RDMA transport.
>
> Called from ->buf_alloc(), we can't sleep waiting for memory.
> So add an API that can get a server op_ctxt but won't sleep.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/svc_rdma.h | 2 ++
> net/sunrpc/xprtrdma/svc_rdma_transport.c | 28 +++++++++++++++++++++++-----
> 2 files changed, 25 insertions(+), 5 deletions(-)
>
> diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
> index 6ce7495..2500dd1 100644
> --- a/include/linux/sunrpc/svc_rdma.h
> +++ b/include/linux/sunrpc/svc_rdma.h
> @@ -224,6 +224,8 @@ extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
> extern int svc_rdma_post_recv(struct svcxprt_rdma *);
> extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
> extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
> +extern struct svc_rdma_op_ctxt *svc_rdma_get_context_gfp(struct svcxprt_rdma *,
> + gfp_t);
> extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
> extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
> extern struct svc_rdma_req_map *svc_rdma_get_req_map(void);
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> index 23aba30..c4083a3 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> @@ -153,17 +153,35 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt)
> }
> #endif /* CONFIG_SUNRPC_BACKCHANNEL */
>
> -struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
> +static void svc_rdma_init_context(struct svcxprt_rdma *xprt,
> + struct svc_rdma_op_ctxt *ctxt)
> {
> - struct svc_rdma_op_ctxt *ctxt;
> -
> - ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep,
> - GFP_KERNEL | __GFP_NOFAIL);
> ctxt->xprt = xprt;
> INIT_LIST_HEAD(&ctxt->dto_q);
> ctxt->count = 0;
> ctxt->frmr = NULL;
> atomic_inc(&xprt->sc_ctxt_used);
> +}
> +
> +struct svc_rdma_op_ctxt *svc_rdma_get_context_gfp(struct svcxprt_rdma *xprt,
> + gfp_t flags)
> +{
> + struct svc_rdma_op_ctxt *ctxt;
> +
> + ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep, flags);
> + if (!ctxt)
> + return NULL;
> + svc_rdma_init_context(xprt, ctxt);
> + return ctxt;
> +}
> +
> +struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
> +{
Why not:
return svc_rdma_get_context_gfp(xprt, GFP_KERNEL | __GFP_NOFAIL);
?
> + struct svc_rdma_op_ctxt *ctxt;
> +
> + ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep,
> + GFP_KERNEL | __GFP_NOFAIL);
> + svc_rdma_init_context(xprt, ctxt);
> return ctxt;
> }
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
On Fri, Sep 18, 2015 at 7:49 PM, Chuck Lever <[email protected]> wrote:
> Hi Devesh-
>
>
> On Sep 18, 2015, at 2:52 AM, Devesh Sharma <[email protected]> wrote:
>
>> On Fri, Sep 18, 2015 at 2:14 AM, Chuck Lever <[email protected]> wrote:
>>>
>>> Commit 8301a2c047cc ("xprtrdma: Limit work done by completion
>>> handler") was supposed to prevent xprtrdma's upcall handlers from
>>> starving other softIRQ work by letting them return to the provider
>>> before all CQEs have been polled.
>>>
>>> The logic assumes the provider will call the upcall handler again
>>> immediately if the CQ is re-armed while there are still queued CQEs.
>>>
>>> This assumption is invalid. The IBTA spec says that after a CQ is
>>> armed, the hardware must interrupt only when a new CQE is inserted.
>>> xprtrdma can't rely on the provider calling again, even though some
>>> providers do.
>>>
>>> Therefore, leaving CQEs on queue makes sense only when there is
>>> another mechanism that ensures all remaining CQEs are consumed in a
>>> timely fashion. xprtrdma does not have such a mechanism. If a CQE
>>> remains queued, the transport can wait forever to send the next RPC.
>>>
>>> Finally, move the wcs array back onto the stack to ensure that the
>>> poll array is always local to the CPU where the completion upcall is
>>> running.
>>>
>>> Fixes: 8301a2c047cc ("xprtrdma: Limit work done by completion ...")
>>> Signed-off-by: Chuck Lever <[email protected]>
>>> ---
>>> net/sunrpc/xprtrdma/verbs.c | 100 ++++++++++++++++++---------------------
>>> net/sunrpc/xprtrdma/xprt_rdma.h | 5 --
>>> 2 files changed, 45 insertions(+), 60 deletions(-)
>>>
>>> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
>>> index 8a477e2..f2e3863 100644
>>> --- a/net/sunrpc/xprtrdma/verbs.c
>>> +++ b/net/sunrpc/xprtrdma/verbs.c
>>> @@ -158,34 +158,37 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
>>> }
>>> }
>>>
>>> -static int
>>> +/* The wc array is on stack: automatic memory is always CPU-local.
>>> + *
>>> + * The common case is a single completion is ready. By asking
>>> + * for two entries, a return code of 1 means there is exactly
>>> + * one completion and no more. We don't have to poll again to
>>> + * know that the CQ is now empty.
>>> + */
>>> +static void
>>> rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
>>> {
>>> - struct ib_wc *wcs;
>>> - int budget, count, rc;
>>> + struct ib_wc *pos, wcs[2];
>>> + int count, rc;
>>>
>>> - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
>>> do {
>>> - wcs = ep->rep_send_wcs;
>>> + pos = wcs;
>>>
>>> - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
>>> - if (rc <= 0)
>>> - return rc;
>>> + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
>>> + if (rc < 0)
>>> + goto out_warn;
>>>
>>> count = rc;
>>> while (count-- > 0)
>>> - rpcrdma_sendcq_process_wc(wcs++);
>>> - } while (rc == RPCRDMA_POLLSIZE && --budget);
>>> - return 0;
>>> + rpcrdma_sendcq_process_wc(pos++);
>>> + } while (rc == ARRAY_SIZE(wcs));
>>
>> I think I have missed something and not able to understand the reason
>> for polling 2 CQEs in one poll?
>
> See the block comment above.
>
> When ib_poll_cq() returns the same number of WCs as the
> consumer requested, there may still be CQEs waiting to
> be polled. Another call to ib_poll_cq() is needed to find
> out if that's the case.
True...
>
> When ib_poll_cq() returns fewer WCs than the consumer
> requested, the consumer doesn't have to call again to
> know that the CQ is empty.
Agree, the while loop will terminate here. What if immediately after
the vendor_poll_cq() decided to report only 1 CQE and terminate
polling loop, another CQE is added. This new CQE will be polled only
after T usec (where T is interrupt-latency).
>
> The common case, by far, is that one CQE is ready. By
> requesting two, the number returned is less than the
> number requested, and the consumer can tell immediately
> that the CQE is drained. The extra ib_poll_cq call is
> avoided.
>
> Note that the existing logic also relies on polling
> multiple WCs at once, and stopping the loop when the
> number of returned WCs is less than the size of the
> array.
There is a logic to perform extra polling too if arm_cq reports missed cqes.
we change the while-loop arm-cq reporting missed cqe logic may be removed.
>
>
>> It is possible that in a given poll_cq
>> call you end up getting on 1 completion, the other completion is
>> delayed due to some reason.
>
> If a CQE is allowed to be delayed, how does polling
> again guarantee that the consumer can retrieve it?
Its possible the moment vendor_poll_cq() looked at the CQ-memory
buffer and decided to report 1 CQE, another CQE was in flight CQE but
poll_cq has already decided not to report 2.
>
> What happens if a signal occurs, there is only one CQE,
> but it is delayed? ib_poll_cq would return 0 in that
> case, and the consumer would never call again, thinking
> the CQ is empty. There's no way the consumer can know
> for sure when a CQ is drained.
Hardware usually guarantees to signal only after the CQE is dma'ed.
>
> If the delayed CQE happens only when there is more
> than one CQE, how can polling multiple WCs ever work
> reliably?
>
> Maybe I don't understand what is meant by delayed.
>
>
>> Would it be better to poll for 1 in every
>> poll call Or
>> otherwise have this
>> while ( rc <= ARRAY_SIZE(wcs) && rc);
>>
>>> + return;
>>> +
>>> +out_warn:
>>> + pr_warn("RPC: %s: ib_poll_cq() failed %i\n", __func__, rc);
>>> }
>>>
>>> -/*
>>> - * Handle send, fast_reg_mr, and local_inv completions.
>>> - *
>>> - * Send events are typically suppressed and thus do not result
>>> - * in an upcall. Occasionally one is signaled, however. This
>>> - * prevents the provider's completion queue from wrapping and
>>> - * losing a completion.
>>> +/* Handle provider send completion upcalls.
>>> */
>>> static void
>>> rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
>>> @@ -193,12 +196,7 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
>>> struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
>>> int rc;
>>>
>>> - rc = rpcrdma_sendcq_poll(cq, ep);
>>> - if (rc) {
>>> - dprintk("RPC: %s: ib_poll_cq failed: %i\n",
>>> - __func__, rc);
>>> - return;
>>> - }
>>> + rpcrdma_sendcq_poll(cq, ep);
>>>
>>> rc = ib_req_notify_cq(cq,
>>> IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
>>> @@ -247,44 +245,41 @@ out_fail:
>>> goto out_schedule;
>>> }
>>>
>>> -static int
>>> +/* The wc array is on stack: automatic memory is always CPU-local.
>>> + *
>>> + * struct ib_wc is 64 bytes, making the poll array potentially
>>> + * large. But this is at the bottom of the call chain. Further
>>> + * substantial work is done in another thread.
>>> + */
>>> +static void
>>> rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
>>> {
>>> - struct list_head sched_list;
>>> - struct ib_wc *wcs;
>>> - int budget, count, rc;
>>> + struct ib_wc *pos, wcs[4];
>>> + LIST_HEAD(sched_list);
>>> + int count, rc;
>>>
>>> - INIT_LIST_HEAD(&sched_list);
>>> - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
>>> do {
>>> - wcs = ep->rep_recv_wcs;
>>> + pos = wcs;
>>>
>>> - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
>>> - if (rc <= 0)
>>> - goto out_schedule;
>>> + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
>>> + if (rc < 0)
>>> + goto out_warn;
>>>
>>> count = rc;
>>> while (count-- > 0)
>>> - rpcrdma_recvcq_process_wc(wcs++, &sched_list);
>>> - } while (rc == RPCRDMA_POLLSIZE && --budget);
>>> - rc = 0;
>>> + rpcrdma_recvcq_process_wc(pos++, &sched_list);
>>> + } while (rc == ARRAY_SIZE(wcs));
>>>
>>> out_schedule:
>>> rpcrdma_schedule_tasklet(&sched_list);
>>> - return rc;
>>> + return;
>>> +
>>> +out_warn:
>>> + pr_warn("RPC: %s: ib_poll_cq() failed %i\n", __func__, rc);
>>> + goto out_schedule;
>>> }
>>>
>>> -/*
>>> - * Handle receive completions.
>>> - *
>>> - * It is reentrant but processes single events in order to maintain
>>> - * ordering of receives to keep server credits.
>>> - *
>>> - * It is the responsibility of the scheduled tasklet to return
>>> - * recv buffers to the pool. NOTE: this affects synchronization of
>>> - * connection shutdown. That is, the structures required for
>>> - * the completion of the reply handler must remain intact until
>>> - * all memory has been reclaimed.
>>> +/* Handle provider receive completion upcalls.
>>> */
>>> static void
>>> rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
>>> @@ -292,12 +287,7 @@ rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
>>> struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
>>> int rc;
>>>
>>> - rc = rpcrdma_recvcq_poll(cq, ep);
>>> - if (rc) {
>>> - dprintk("RPC: %s: ib_poll_cq failed: %i\n",
>>> - __func__, rc);
>>> - return;
>>> - }
>>> + rpcrdma_recvcq_poll(cq, ep);
>>>
>>> rc = ib_req_notify_cq(cq,
>>> IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
>>> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
>>> index c09414e..42c8d44 100644
>>> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
>>> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
>>> @@ -77,9 +77,6 @@ struct rpcrdma_ia {
>>> * RDMA Endpoint -- one per transport instance
>>> */
>>>
>>> -#define RPCRDMA_WC_BUDGET (128)
>>> -#define RPCRDMA_POLLSIZE (16)
>>> -
>>> struct rpcrdma_ep {
>>> atomic_t rep_cqcount;
>>> int rep_cqinit;
>>> @@ -89,8 +86,6 @@ struct rpcrdma_ep {
>>> struct rdma_conn_param rep_remote_cma;
>>> struct sockaddr_storage rep_remote_addr;
>>> struct delayed_work rep_connect_worker;
>>> - struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
>>> - struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
>>> };
>>>
>>> /*
>>>
>>> --
>>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>>> the body of a message to [email protected]
>>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
> --
> Chuck Lever
>
>
>
On Sun, Sep 20, 2015 at 4:05 PM, Sagi Grimberg <[email protected]> wrote:
>>> It is possible that in a given poll_cq
>>> call you end up getting on 1 completion, the other completion is
>>> delayed due to some reason.
>>
>>
>> If a CQE is allowed to be delayed, how does polling
>> again guarantee that the consumer can retrieve it?
>>
>> What happens if a signal occurs, there is only one CQE,
>> but it is delayed? ib_poll_cq would return 0 in that
>> case, and the consumer would never call again, thinking
>> the CQ is empty. There's no way the consumer can know
>> for sure when a CQ is drained.
>>
>> If the delayed CQE happens only when there is more
>> than one CQE, how can polling multiple WCs ever work
>> reliably?
>>
>> Maybe I don't understand what is meant by delayed.
>>
>
> If I'm not mistaken, Devesh meant that if between ib_poll_cq (where you
> polled the last 2 wcs) until the while statement another CQE was
> generated then you lost a bit of efficiency. Correct?
Yes, That's the point.
>
>
>>
>>> Would it be better to poll for 1 in every
>>> poll call Or
>>> otherwise have this
>>> while ( rc <= ARRAY_SIZE(wcs) && rc);
>>>
>
Looks Good.
On Fri, Sep 18, 2015 at 2:14 AM, Chuck Lever <[email protected]> wrote:
> After adding a swapfile on an NFS/RDMA mount and removing the
> normal swap partition, I was able to push the NFS client well
> into swap without any issue.
>
> I forgot to swapoff the NFS file before rebooting. This pinned
> the NFS mount and the IB core and provider, causing shutdown to
> hang. I think this is expected and safe behavior. Probably
> shutdown scripts should "swapoff -a" before unmounting any
> filesystems.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/transport.c | 2 +-
> 1 file changed, 1 insertion(+), 1 deletion(-)
>
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
> index 41e452b..e9e5ed7 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -676,7 +676,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
> static int
> xprt_rdma_enable_swap(struct rpc_xprt *xprt)
> {
> - return -EINVAL;
> + return 0;
> }
>
> static void
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
Looks good, will test this ocrdma and update you.
On Fri, Sep 18, 2015 at 2:14 AM, Chuck Lever <[email protected]> wrote:
> The core API has changed so that devices that do not have a global
> DMA lkey automatically create an mr, per-PD, and make that lkey
> available. The global DMA lkey interface is going away in favor of
> the per-PD DMA lkey.
>
> The per-PD DMA lkey is always available. Convert xprtrdma to use the
> device's per-PD DMA lkey for regbufs, no matter which memory
> registration scheme is in use.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/fmr_ops.c | 19 -------------------
> net/sunrpc/xprtrdma/frwr_ops.c | 5 -----
> net/sunrpc/xprtrdma/physical_ops.c | 10 +---------
> net/sunrpc/xprtrdma/verbs.c | 2 +-
> net/sunrpc/xprtrdma/xprt_rdma.h | 1 -
> 5 files changed, 2 insertions(+), 35 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
> index cb25c89..f1e8daf 100644
> --- a/net/sunrpc/xprtrdma/fmr_ops.c
> +++ b/net/sunrpc/xprtrdma/fmr_ops.c
> @@ -39,25 +39,6 @@ static int
> fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
> struct rpcrdma_create_data_internal *cdata)
> {
> - struct ib_device_attr *devattr = &ia->ri_devattr;
> - struct ib_mr *mr;
> -
> - /* Obtain an lkey to use for the regbufs, which are
> - * protected from remote access.
> - */
> - if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
> - ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
> - } else {
> - mr = ib_get_dma_mr(ia->ri_pd, IB_ACCESS_LOCAL_WRITE);
> - if (IS_ERR(mr)) {
> - pr_err("%s: ib_get_dma_mr for failed with %lX\n",
> - __func__, PTR_ERR(mr));
> - return -ENOMEM;
> - }
> - ia->ri_dma_lkey = ia->ri_dma_mr->lkey;
> - ia->ri_dma_mr = mr;
> - }
> -
> return 0;
> }
>
> diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
> index 21b3efb..004f1ad 100644
> --- a/net/sunrpc/xprtrdma/frwr_ops.c
> +++ b/net/sunrpc/xprtrdma/frwr_ops.c
> @@ -189,11 +189,6 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
> struct ib_device_attr *devattr = &ia->ri_devattr;
> int depth, delta;
>
> - /* Obtain an lkey to use for the regbufs, which are
> - * protected from remote access.
> - */
> - ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
> -
> ia->ri_max_frmr_depth =
> min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
> devattr->max_fast_reg_page_list_len);
> diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
> index 72cf8b1..617b76f 100644
> --- a/net/sunrpc/xprtrdma/physical_ops.c
> +++ b/net/sunrpc/xprtrdma/physical_ops.c
> @@ -23,7 +23,6 @@ static int
> physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
> struct rpcrdma_create_data_internal *cdata)
> {
> - struct ib_device_attr *devattr = &ia->ri_devattr;
> struct ib_mr *mr;
>
> /* Obtain an rkey to use for RPC data payloads.
> @@ -37,15 +36,8 @@ physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
> __func__, PTR_ERR(mr));
> return -ENOMEM;
> }
> - ia->ri_dma_mr = mr;
> -
> - /* Obtain an lkey to use for regbufs.
> - */
> - if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)
> - ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
> - else
> - ia->ri_dma_lkey = ia->ri_dma_mr->lkey;
>
> + ia->ri_dma_mr = mr;
> return 0;
> }
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index 01a314a..8a477e2 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -1255,7 +1255,7 @@ rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
> goto out_free;
>
> iov->length = size;
> - iov->lkey = ia->ri_dma_lkey;
> + iov->lkey = ia->ri_pd->local_dma_lkey;
> rb->rg_size = size;
> rb->rg_owner = NULL;
> return rb;
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index 0251222..c09414e 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -65,7 +65,6 @@ struct rpcrdma_ia {
> struct rdma_cm_id *ri_id;
> struct ib_pd *ri_pd;
> struct ib_mr *ri_dma_mr;
> - u32 ri_dma_lkey;
> struct completion ri_done;
> int ri_async_rc;
> unsigned int ri_max_frmr_depth;
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
Looks good.
On Fri, Sep 18, 2015 at 2:14 AM, Chuck Lever <[email protected]> wrote:
> Clean up: The error cases in rpcrdma_reply_handler() almost never
> execute. Ensure the compiler places them out of the hot path.
>
> No behavior change expected.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/rpc_rdma.c | 90 ++++++++++++++++++++++-----------------
> net/sunrpc/xprtrdma/verbs.c | 2 -
> net/sunrpc/xprtrdma/xprt_rdma.h | 2 +
> 3 files changed, 54 insertions(+), 40 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
> index bc8bd65..287c874 100644
> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -741,52 +741,27 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> unsigned long cwnd;
> u32 credits;
>
> - /* Check status. If bad, signal disconnect and return rep to pool */
> - if (rep->rr_len == ~0U) {
> - rpcrdma_recv_buffer_put(rep);
> - if (r_xprt->rx_ep.rep_connected == 1) {
> - r_xprt->rx_ep.rep_connected = -EIO;
> - rpcrdma_conn_func(&r_xprt->rx_ep);
> - }
> - return;
> - }
> - if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
> - dprintk("RPC: %s: short/invalid reply\n", __func__);
> - goto repost;
> - }
> + dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
> +
> + if (rep->rr_len == RPCRDMA_BAD_LEN)
> + goto out_badstatus;
> + if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
> + goto out_shortreply;
> +
> headerp = rdmab_to_msg(rep->rr_rdmabuf);
> - if (headerp->rm_vers != rpcrdma_version) {
> - dprintk("RPC: %s: invalid version %d\n",
> - __func__, be32_to_cpu(headerp->rm_vers));
> - goto repost;
> - }
> + if (headerp->rm_vers != rpcrdma_version)
> + goto out_badversion;
>
> /* Get XID and try for a match. */
> spin_lock(&xprt->transport_lock);
> rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
> - if (rqst == NULL) {
> - spin_unlock(&xprt->transport_lock);
> - dprintk("RPC: %s: reply 0x%p failed "
> - "to match any request xid 0x%08x len %d\n",
> - __func__, rep, be32_to_cpu(headerp->rm_xid),
> - rep->rr_len);
> -repost:
> - r_xprt->rx_stats.bad_reply_count++;
> - if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
> - rpcrdma_recv_buffer_put(rep);
> -
> - return;
> - }
> + if (!rqst)
> + goto out_nomatch;
>
> /* get request object */
> req = rpcr_to_rdmar(rqst);
> - if (req->rl_reply) {
> - spin_unlock(&xprt->transport_lock);
> - dprintk("RPC: %s: duplicate reply 0x%p to RPC "
> - "request 0x%p: xid 0x%08x\n", __func__, rep, req,
> - be32_to_cpu(headerp->rm_xid));
> - goto repost;
> - }
> + if (req->rl_reply)
> + goto out_duplicate;
>
> dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
> " RPC request 0x%p xid 0x%08x\n",
> @@ -883,8 +858,45 @@ badheader:
> if (xprt->cwnd > cwnd)
> xprt_release_rqst_cong(rqst->rq_task);
>
> + xprt_complete_rqst(rqst->rq_task, status);
> + spin_unlock(&xprt->transport_lock);
> dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
> __func__, xprt, rqst, status);
> - xprt_complete_rqst(rqst->rq_task, status);
> + return;
> +
> +out_badstatus:
> + rpcrdma_recv_buffer_put(rep);
> + if (r_xprt->rx_ep.rep_connected == 1) {
> + r_xprt->rx_ep.rep_connected = -EIO;
> + rpcrdma_conn_func(&r_xprt->rx_ep);
> + }
> + return;
> +
> +out_shortreply:
> + dprintk("RPC: %s: short/invalid reply\n", __func__);
> + goto repost;
> +
> +out_badversion:
> + dprintk("RPC: %s: invalid version %d\n",
> + __func__, be32_to_cpu(headerp->rm_vers));
> + goto repost;
> +
> +out_nomatch:
> + spin_unlock(&xprt->transport_lock);
> + dprintk("RPC: %s: reply 0x%p failed "
> + "to match any request xid 0x%08x len %d\n",
> + __func__, rep, be32_to_cpu(headerp->rm_xid),
> + rep->rr_len);
> + goto repost;
> +
> +out_duplicate:
> spin_unlock(&xprt->transport_lock);
> + dprintk("RPC: %s: duplicate reply 0x%p to RPC "
> + "request 0x%p: xid 0x%08x\n", __func__, rep, req,
> + be32_to_cpu(headerp->rm_xid));
> +
> +repost:
> + r_xprt->rx_stats.bad_reply_count++;
> + if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
> + rpcrdma_recv_buffer_put(rep);
> }
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index f2e3863..ac1345b 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -241,7 +241,7 @@ out_fail:
> if (wc->status != IB_WC_WR_FLUSH_ERR)
> pr_err("RPC: %s: rep %p: %s\n",
> __func__, rep, ib_wc_status_msg(wc->status));
> - rep->rr_len = ~0U;
> + rep->rr_len = RPCRDMA_BAD_LEN;
> goto out_schedule;
> }
>
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index 42c8d44..a13508b 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -168,6 +168,8 @@ struct rpcrdma_rep {
> struct rpcrdma_regbuf *rr_rdmabuf;
> };
>
> +#define RPCRDMA_BAD_LEN (~0U)
> +
> /*
> * struct rpcrdma_mw - external memory region metadata
> *
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
Looks good.
On Fri, Sep 18, 2015 at 2:15 AM, Chuck Lever <[email protected]> wrote:
> xprtrdma's backward direction send and receive buffers are the same
> size as the forechannel's inline threshold, and must be pre-
> registered.
>
> The consumer has no control over which receive buffer the adapter
> chooses to catch an incoming backwards-direction call. Any receive
> buffer can be used for either a forward reply or a backward call.
> Thus both types of RPC message must all be the same size.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/Makefile | 1
> net/sunrpc/xprtrdma/backchannel.c | 204 +++++++++++++++++++++++++++++++++++++
> net/sunrpc/xprtrdma/transport.c | 7 +
> net/sunrpc/xprtrdma/verbs.c | 92 ++++++++++++++---
> net/sunrpc/xprtrdma/xprt_rdma.h | 20 ++++
> 5 files changed, 309 insertions(+), 15 deletions(-)
> create mode 100644 net/sunrpc/xprtrdma/backchannel.c
>
> diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
> index 48913de..33f99d3 100644
> --- a/net/sunrpc/xprtrdma/Makefile
> +++ b/net/sunrpc/xprtrdma/Makefile
> @@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
> svc_rdma.o svc_rdma_transport.o \
> svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
> module.o
> +rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
> diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
> new file mode 100644
> index 0000000..c0a42ad
> --- /dev/null
> +++ b/net/sunrpc/xprtrdma/backchannel.c
> @@ -0,0 +1,204 @@
> +/*
> + * Copyright (c) 2015 Oracle. All rights reserved.
> + *
> + * Support for backward direction RPCs on RPC/RDMA.
> + */
> +
> +#include <linux/module.h>
> +
> +#include "xprt_rdma.h"
> +
> +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
> +# define RPCDBG_FACILITY RPCDBG_TRANS
> +#endif
> +
> +static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
> + struct rpc_rqst *rqst)
> +{
> + struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
> + struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
> +
> + spin_lock(&buf->rb_reqslock);
> + list_del(&req->rl_all);
> + spin_unlock(&buf->rb_reqslock);
> +
> + rpcrdma_destroy_req(&r_xprt->rx_ia, req);
> +
> + kfree(rqst);
> +}
> +
> +static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
> + struct rpc_rqst *rqst)
> +{
> + struct rpcrdma_ia *ia = &r_xprt->rx_ia;
> + struct rpcrdma_regbuf *rb;
> + struct rpcrdma_req *req;
> + struct xdr_buf *buf;
> + size_t size;
> +
> + req = rpcrdma_create_req(r_xprt);
> + if (!req)
> + return -ENOMEM;
> + req->rl_backchannel = true;
> +
> + size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
> + rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
> + if (IS_ERR(rb))
> + goto out_fail;
> + req->rl_rdmabuf = rb;
> +
> + size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
> + rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
> + if (IS_ERR(rb))
> + goto out_fail;
> + rb->rg_owner = req;
> + req->rl_sendbuf = rb;
> + /* so that rpcr_to_rdmar works when receiving a request */
> + rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
> +
> + buf = &rqst->rq_snd_buf;
> + buf->head[0].iov_base = rqst->rq_buffer;
> + buf->head[0].iov_len = 0;
> + buf->tail[0].iov_base = NULL;
> + buf->tail[0].iov_len = 0;
> + buf->page_len = 0;
> + buf->len = 0;
> + buf->buflen = size;
> +
> + return 0;
> +
> +out_fail:
> + rpcrdma_bc_free_rqst(r_xprt, rqst);
> + return -ENOMEM;
> +}
> +
> +/* Allocate and add receive buffers to the rpcrdma_buffer's existing
> + * list of rep's. These are released when the transport is destroyed. */
> +static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
> + unsigned int count)
> +{
> + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
> + struct rpcrdma_rep *rep;
> + unsigned long flags;
> + int rc = 0;
> +
> + while (count--) {
> + rep = rpcrdma_create_rep(r_xprt);
> + if (IS_ERR(rep)) {
> + pr_err("RPC: %s: reply buffer alloc failed\n",
> + __func__);
> + rc = PTR_ERR(rep);
> + break;
> + }
> +
> + spin_lock_irqsave(&buffers->rb_lock, flags);
> + list_add(&rep->rr_list, &buffers->rb_recv_bufs);
> + spin_unlock_irqrestore(&buffers->rb_lock, flags);
> + }
> +
> + return rc;
> +}
> +
> +/**
> + * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
> + * @xprt: transport associated with these backchannel resources
> + * @reqs: number of concurrent incoming requests to expect
> + *
> + * Returns 0 on success; otherwise a negative errno
> + */
> +int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
> +{
> + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
> + struct rpc_rqst *rqst;
> + unsigned int i;
> + int rc;
> +
> + /* The backchannel reply path returns each rpc_rqst to the
> + * bc_pa_list _after_ the reply is sent. If the server is
> + * faster than the client, it can send another backward
> + * direction request before the rpc_rqst is returned to the
> + * list. The client rejects the request in this case.
> + *
> + * Twice as many rpc_rqsts are prepared to ensure there is
> + * always an rpc_rqst available as soon as a reply is sent.
> + */
> + for (i = 0; i < (reqs << 1); i++) {
> + rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
> + if (!rqst) {
> + pr_err("RPC: %s: Failed to create bc rpc_rqst\n",
> + __func__);
> + goto out_free;
> + }
> +
> + rqst->rq_xprt = &r_xprt->rx_xprt;
> + INIT_LIST_HEAD(&rqst->rq_list);
> + INIT_LIST_HEAD(&rqst->rq_bc_list);
> +
> + if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
> + goto out_free;
> +
> + spin_lock_bh(&xprt->bc_pa_lock);
> + list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
> + spin_unlock_bh(&xprt->bc_pa_lock);
> + }
> +
> + rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
> + if (rc)
> + goto out_free;
> +
> + rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
> + if (rc)
> + goto out_free;
> +
> + buffer->rb_bc_srv_max_requests = reqs;
> + request_module("svcrdma");
> +
> + return 0;
> +
> +out_free:
> + xprt_rdma_bc_destroy(xprt, reqs);
> +
> + pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
> + return -ENOMEM;
> +}
> +
> +/**
> + * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
> + * @xprt: transport associated with these backchannel resources
> + * @reqs: number of incoming requests to destroy; ignored
> + */
> +void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
> +{
> + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> + struct rpc_rqst *rqst, *tmp;
> +
> + spin_lock_bh(&xprt->bc_pa_lock);
> + list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
> + list_del(&rqst->rq_bc_pa_list);
> + spin_unlock_bh(&xprt->bc_pa_lock);
> +
> + rpcrdma_bc_free_rqst(r_xprt, rqst);
> +
> + spin_lock_bh(&xprt->bc_pa_lock);
> + }
> + spin_unlock_bh(&xprt->bc_pa_lock);
> +}
> +
> +/**
> + * xprt_rdma_bc_free_rqst - Release a backchannel rqst
> + * @rqst: request to release
> + */
> +void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
> +{
> + struct rpc_xprt *xprt = rqst->rq_xprt;
> +
> + smp_mb__before_atomic();
> + WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
> + clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
> + smp_mb__after_atomic();
> +
> + spin_lock_bh(&xprt->bc_pa_lock);
> + list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
> + spin_unlock_bh(&xprt->bc_pa_lock);
> +}
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
> index e9e5ed7..e3871a6 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -705,7 +705,12 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
> .print_stats = xprt_rdma_print_stats,
> .enable_swap = xprt_rdma_enable_swap,
> .disable_swap = xprt_rdma_disable_swap,
> - .inject_disconnect = xprt_rdma_inject_disconnect
> + .inject_disconnect = xprt_rdma_inject_disconnect,
> +#if defined(CONFIG_SUNRPC_BACKCHANNEL)
> + .bc_setup = xprt_rdma_bc_setup,
> + .bc_free_rqst = xprt_rdma_bc_free_rqst,
> + .bc_destroy = xprt_rdma_bc_destroy,
> +#endif
> };
>
> static struct xprt_class xprt_rdma = {
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index 8d99214..1e4a948 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -877,7 +877,22 @@ retry:
> }
> rc = ep->rep_connected;
> } else {
> + struct rpcrdma_xprt *r_xprt;
> + unsigned int extras;
> +
> dprintk("RPC: %s: connected\n", __func__);
> +
> + r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
> + extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
> +
> + if (extras) {
> + rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
> + if (rc)
> + pr_err("%s: could not post "
> + "extra receive buffers: %i\n",
> + __func__, rc);
> + rc = 0;
> + }
> }
>
> out:
> @@ -914,20 +929,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
> }
> }
>
> -static struct rpcrdma_req *
> +struct rpcrdma_req *
> rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
> {
> + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
> struct rpcrdma_req *req;
>
> req = kzalloc(sizeof(*req), GFP_KERNEL);
> if (req == NULL)
> return ERR_PTR(-ENOMEM);
>
> + INIT_LIST_HEAD(&req->rl_free);
> + spin_lock(&buffer->rb_reqslock);
> + list_add(&req->rl_all, &buffer->rb_allreqs);
> + spin_unlock(&buffer->rb_reqslock);
> req->rl_buffer = &r_xprt->rx_buf;
> return req;
> }
>
> -static struct rpcrdma_rep *
> +struct rpcrdma_rep *
> rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
> {
> struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
> @@ -965,6 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
> int i, rc;
>
> buf->rb_max_requests = r_xprt->rx_data.max_requests;
> + buf->rb_bc_srv_max_requests = 0;
> spin_lock_init(&buf->rb_lock);
>
> rc = ia->ri_ops->ro_init(r_xprt);
> @@ -972,6 +993,8 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
> goto out;
>
> INIT_LIST_HEAD(&buf->rb_send_bufs);
> + INIT_LIST_HEAD(&buf->rb_allreqs);
> + spin_lock_init(&buf->rb_reqslock);
> for (i = 0; i < buf->rb_max_requests; i++) {
> struct rpcrdma_req *req;
>
> @@ -982,6 +1005,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
> rc = PTR_ERR(req);
> goto out;
> }
> + req->rl_backchannel = false;
> list_add(&req->rl_free, &buf->rb_send_bufs);
> }
>
> @@ -1008,19 +1032,13 @@ out:
> static void
> rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
> {
> - if (!rep)
> - return;
> -
> rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
> kfree(rep);
> }
>
> -static void
> +void
> rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
> {
> - if (!req)
> - return;
> -
> rpcrdma_free_regbuf(ia, req->rl_sendbuf);
> rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
> kfree(req);
> @@ -1040,14 +1058,20 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
> rpcrdma_destroy_rep(ia, rep);
> }
>
> - while (!list_empty(&buf->rb_send_bufs)) {
> - struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next,
> + spin_lock(&buf->rb_reqslock);
> + while (!list_empty(&buf->rb_allreqs)) {
> + struct rpcrdma_req *req = list_entry(buf->rb_allreqs.next,
> struct rpcrdma_req,
> - rl_free);
> + rl_all);
> +
> + list_del(&req->rl_all);
> + spin_unlock(&buf->rb_reqslock);
>
> - list_del(&req->rl_free);
> rpcrdma_destroy_req(ia, req);
> +
> + spin_lock(&buf->rb_reqslock);
> }
> + spin_unlock(&buf->rb_reqslock);
>
> ia->ri_ops->ro_destroy(buf);
> }
> @@ -1094,7 +1118,7 @@ rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf)
>
> rep = list_first_entry(&buf->rb_recv_bufs,
> struct rpcrdma_rep, rr_list);
> - list_del(&rep->rr_list);
> + list_del_init(&rep->rr_list);
>
> return rep;
> }
> @@ -1337,6 +1361,46 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
> return rc;
> }
>
> +/**
> + * rpcrdma_bc_post_recv - Post buffers to catch incoming backchannel requests
> + * @r_xprt: transport associated with these backchannel resources
> + * @min_reqs: minimum number of incoming requests expected
> + *
> + * Returns zero if all requested buffers were posted, or a negative errno.
> + */
> +int
> +rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
> +{
> + struct rpcrdma_ia *ia = &r_xprt->rx_ia;
> + struct rpcrdma_ep *ep = &r_xprt->rx_ep;
> + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
> + struct rpcrdma_rep *rep;
> + unsigned long flags;
> + int rc;
> +
> + while (count--) {
> + rep = NULL;
> + spin_lock_irqsave(&buffers->rb_lock, flags);
> + if (!list_empty(&buffers->rb_recv_bufs))
> + rep = rpcrdma_buffer_get_locked(buffers);
> + spin_unlock_irqrestore(&buffers->rb_lock, flags);
> + if (!rep) {
> + pr_err("%s: no extra receive buffers\n", __func__);
> + return -ENOMEM;
> + }
> +
> + rc = rpcrdma_ep_post_recv(ia, ep, rep);
> + if (rc) {
> + spin_lock_irqsave(&buffers->rb_lock, flags);
> + rpcrdma_buffer_put_locked(rep, buffers);
> + spin_unlock_irqrestore(&buffers->rb_lock, flags);
> + return rc;
> + }
> + }
> +
> + return 0;
> +}
> +
> /* How many chunk list items fit within our inline buffers?
> */
> unsigned int
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index e6a358f..2ca0567 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -262,6 +262,9 @@ struct rpcrdma_req {
> struct rpcrdma_regbuf *rl_rdmabuf;
> struct rpcrdma_regbuf *rl_sendbuf;
> struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
> +
> + struct list_head rl_all;
> + bool rl_backchannel;
> };
>
> static inline struct rpcrdma_req *
> @@ -290,6 +293,10 @@ struct rpcrdma_buffer {
> struct list_head rb_send_bufs;
> struct list_head rb_recv_bufs;
> u32 rb_max_requests;
> +
> + u32 rb_bc_srv_max_requests;
> + spinlock_t rb_reqslock; /* protect rb_allreqs */
> + struct list_head rb_allreqs;
> };
> #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
>
> @@ -410,6 +417,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
> /*
> * Buffer calls - xprtrdma/verbs.c
> */
> +struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
> +struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
> +void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
> int rpcrdma_buffer_create(struct rpcrdma_xprt *);
> void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
>
> @@ -426,6 +436,7 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
> struct rpcrdma_regbuf *);
>
> unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
> +int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
>
> int frwr_alloc_recovery_wq(void);
> void frwr_destroy_recovery_wq(void);
> @@ -490,6 +501,15 @@ int rpcrdma_marshal_req(struct rpc_rqst *);
> int xprt_rdma_init(void);
> void xprt_rdma_cleanup(void);
>
> +/* Backchannel calls - xprtrdma/backchannel.c
> + */
> +#if defined(CONFIG_SUNRPC_BACKCHANNEL)
> +int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
> +int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
> +void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
> +void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
> +#endif /* CONFIG_SUNRPC_BACKCHANNEL */
> +
> /* Temporary NFS request map cache. Created in svc_rdma.c */
> extern struct kmem_cache *svc_rdma_map_cachep;
> /* WR context cache. Created in svc_rdma.c */
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
On Fri, Sep 18, 2015 at 2:15 AM, Chuck Lever <[email protected]> wrote:
> Pre-allocate extra send and receive Work Requests needed to handle
> backchannel receives and sends.
>
> The transport doesn't know how many extra WRs to pre-allocate until
> the xprt_setup_backchannel() call, but that's long after the WRs are
> allocated during forechannel setup.
>
> So, use a fixed value for now.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/backchannel.c | 4 ++++
> net/sunrpc/xprtrdma/verbs.c | 14 ++++++++++++--
> net/sunrpc/xprtrdma/xprt_rdma.h | 10 ++++++++++
> 3 files changed, 26 insertions(+), 2 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
> index c0a42ad..f5c7122 100644
> --- a/net/sunrpc/xprtrdma/backchannel.c
> +++ b/net/sunrpc/xprtrdma/backchannel.c
> @@ -123,6 +123,9 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
> * Twice as many rpc_rqsts are prepared to ensure there is
> * always an rpc_rqst available as soon as a reply is sent.
> */
> + if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
> + goto out_err;
> +
> for (i = 0; i < (reqs << 1); i++) {
> rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
> if (!rqst) {
> @@ -159,6 +162,7 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
> out_free:
> xprt_rdma_bc_destroy(xprt, reqs);
>
> +out_err:
> pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
> return -ENOMEM;
> }
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index 1e4a948..133c720 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -614,6 +614,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
> struct ib_device_attr *devattr = &ia->ri_devattr;
> struct ib_cq *sendcq, *recvcq;
> struct ib_cq_init_attr cq_attr = {};
> + unsigned int max_qp_wr;
> int rc, err;
>
> if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
> @@ -622,18 +623,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
> return -ENOMEM;
> }
>
> + if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
> + dprintk("RPC: %s: insufficient wqe's available\n",
> + __func__);
> + return -ENOMEM;
> + }
> + max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
> +
> /* check provider's send/recv wr limits */
> - if (cdata->max_requests > devattr->max_qp_wr)
> - cdata->max_requests = devattr->max_qp_wr;
> + if (cdata->max_requests > max_qp_wr)
> + cdata->max_requests = max_qp_wr;
should we
cdata->max_request = max_qp_wr - RPCRDMA_BACKWARD_WRS?
>
> ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
> ep->rep_attr.qp_context = ep;
> ep->rep_attr.srq = NULL;
> ep->rep_attr.cap.max_send_wr = cdata->max_requests;
> + ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
Looks like will cause a qp-create failure if any hypothetical device
supports devattr->max_qp_wr = cdata->max_requests
> rc = ia->ri_ops->ro_open(ia, ep, cdata);
> if (rc)
> return rc;
> ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
> + ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
> ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
> ep->rep_attr.cap.max_recv_sge = 1;
> ep->rep_attr.cap.max_inline_data = 0;
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index 2ca0567..37d0d7f 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -101,6 +101,16 @@ struct rpcrdma_ep {
> */
> #define RPCRDMA_IGNORE_COMPLETION (0ULL)
>
> +/* Pre-allocate extra Work Requests for handling backward receives
> + * and sends. This is a fixed value because the Work Queues are
> + * allocated when the forward channel is set up.
> + */
> +#if defined(CONFIG_SUNRPC_BACKCHANNEL)
> +#define RPCRDMA_BACKWARD_WRS (8)
> +#else
> +#define RPCRDMA_BACKWARD_WRS (0)
> +#endif
> +
> /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
> *
> * The below structure appears at the front of a large region of kmalloc'd
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
> On Sep 21, 2015, at 1:51 AM, Devesh Sharma <[email protected]> wrote:
>
> On Sun, Sep 20, 2015 at 4:05 PM, Sagi Grimberg <[email protected]> wrote:
>>>> It is possible that in a given poll_cq
>>>> call you end up getting on 1 completion, the other completion is
>>>> delayed due to some reason.
>>>
>>>
>>> If a CQE is allowed to be delayed, how does polling
>>> again guarantee that the consumer can retrieve it?
>>>
>>> What happens if a signal occurs, there is only one CQE,
>>> but it is delayed? ib_poll_cq would return 0 in that
>>> case, and the consumer would never call again, thinking
>>> the CQ is empty. There's no way the consumer can know
>>> for sure when a CQ is drained.
>>>
>>> If the delayed CQE happens only when there is more
>>> than one CQE, how can polling multiple WCs ever work
>>> reliably?
>>>
>>> Maybe I don't understand what is meant by delayed.
>>>
>>
>> If I'm not mistaken, Devesh meant that if between ib_poll_cq (where you
>> polled the last 2 wcs) until the while statement another CQE was
>> generated then you lost a bit of efficiency. Correct?
>
> Yes, That's the point.
I’m optimizing for the common case where 1 CQE is ready
to be polled. How much of an efficiency loss are you
talking about, how often would this loss occur, and is
this a problem for all providers / devices?
Is this an issue for the current arrangement where 8 WCs
are polled at a time?
—
Chuck Lever
> On Sep 20, 2015, at 5:40 AM, Sagi Grimberg <[email protected]> wrote:
>
> On 9/17/2015 11:46 PM, Chuck Lever wrote:
>> To support backward direction calls, I'm going to add an
>> svc_rdma_get_context() call in the client RDMA transport.
>>
>> Called from ->buf_alloc(), we can't sleep waiting for memory.
>> So add an API that can get a server op_ctxt but won't sleep.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> include/linux/sunrpc/svc_rdma.h | 2 ++
>> net/sunrpc/xprtrdma/svc_rdma_transport.c | 28 +++++++++++++++++++++++-----
>> 2 files changed, 25 insertions(+), 5 deletions(-)
>>
>> diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
>> index 6ce7495..2500dd1 100644
>> --- a/include/linux/sunrpc/svc_rdma.h
>> +++ b/include/linux/sunrpc/svc_rdma.h
>> @@ -224,6 +224,8 @@ extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
>> extern int svc_rdma_post_recv(struct svcxprt_rdma *);
>> extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
>> extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
>> +extern struct svc_rdma_op_ctxt *svc_rdma_get_context_gfp(struct svcxprt_rdma *,
>> + gfp_t);
>> extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
>> extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
>> extern struct svc_rdma_req_map *svc_rdma_get_req_map(void);
>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>> index 23aba30..c4083a3 100644
>> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
>> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>> @@ -153,17 +153,35 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt)
>> }
>> #endif /* CONFIG_SUNRPC_BACKCHANNEL */
>>
>> -struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
>> +static void svc_rdma_init_context(struct svcxprt_rdma *xprt,
>> + struct svc_rdma_op_ctxt *ctxt)
>> {
>> - struct svc_rdma_op_ctxt *ctxt;
>> -
>> - ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep,
>> - GFP_KERNEL | __GFP_NOFAIL);
>> ctxt->xprt = xprt;
>> INIT_LIST_HEAD(&ctxt->dto_q);
>> ctxt->count = 0;
>> ctxt->frmr = NULL;
>> atomic_inc(&xprt->sc_ctxt_used);
>> +}
>> +
>> +struct svc_rdma_op_ctxt *svc_rdma_get_context_gfp(struct svcxprt_rdma *xprt,
>> + gfp_t flags)
>> +{
>> + struct svc_rdma_op_ctxt *ctxt;
>> +
>> + ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep, flags);
>> + if (!ctxt)
>> + return NULL;
>> + svc_rdma_init_context(xprt, ctxt);
>> + return ctxt;
>> +}
>> +
>> +struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
>> +{
>
> Why not:
> return svc_rdma_get_context_gfp(xprt, GFP_KERNEL | __GFP_NOFAIL);
The “if (!ctxt) return NULL;” is unneeded if __GFP_NOFAIL is
specified.
I’ll wait for additional comments on this one, I could go
either way.
> ?
>
>> + struct svc_rdma_op_ctxt *ctxt;
>> +
>> + ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep,
>> + GFP_KERNEL | __GFP_NOFAIL);
>> + svc_rdma_init_context(xprt, ctxt);
>> return ctxt;
>> }
>>
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
—
Chuck Lever
> On Sep 21, 2015, at 3:33 AM, Devesh Sharma <[email protected]> wrote:
>
> On Fri, Sep 18, 2015 at 2:15 AM, Chuck Lever <[email protected]> wrote:
>> Pre-allocate extra send and receive Work Requests needed to handle
>> backchannel receives and sends.
>>
>> The transport doesn't know how many extra WRs to pre-allocate until
>> the xprt_setup_backchannel() call, but that's long after the WRs are
>> allocated during forechannel setup.
>>
>> So, use a fixed value for now.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> net/sunrpc/xprtrdma/backchannel.c | 4 ++++
>> net/sunrpc/xprtrdma/verbs.c | 14 ++++++++++++--
>> net/sunrpc/xprtrdma/xprt_rdma.h | 10 ++++++++++
>> 3 files changed, 26 insertions(+), 2 deletions(-)
>>
>> diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
>> index c0a42ad..f5c7122 100644
>> --- a/net/sunrpc/xprtrdma/backchannel.c
>> +++ b/net/sunrpc/xprtrdma/backchannel.c
>> @@ -123,6 +123,9 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
>> * Twice as many rpc_rqsts are prepared to ensure there is
>> * always an rpc_rqst available as soon as a reply is sent.
>> */
>> + if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
>> + goto out_err;
>> +
>> for (i = 0; i < (reqs << 1); i++) {
>> rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
>> if (!rqst) {
>> @@ -159,6 +162,7 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
>> out_free:
>> xprt_rdma_bc_destroy(xprt, reqs);
>>
>> +out_err:
>> pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
>> return -ENOMEM;
>> }
>> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
>> index 1e4a948..133c720 100644
>> --- a/net/sunrpc/xprtrdma/verbs.c
>> +++ b/net/sunrpc/xprtrdma/verbs.c
>> @@ -614,6 +614,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
>> struct ib_device_attr *devattr = &ia->ri_devattr;
>> struct ib_cq *sendcq, *recvcq;
>> struct ib_cq_init_attr cq_attr = {};
>> + unsigned int max_qp_wr;
>> int rc, err;
>>
>> if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
>> @@ -622,18 +623,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
>> return -ENOMEM;
>> }
>>
>> + if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
>> + dprintk("RPC: %s: insufficient wqe's available\n",
>> + __func__);
>> + return -ENOMEM;
>> + }
>> + max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
>> +
>> /* check provider's send/recv wr limits */
>> - if (cdata->max_requests > devattr->max_qp_wr)
>> - cdata->max_requests = devattr->max_qp_wr;
>> + if (cdata->max_requests > max_qp_wr)
>> + cdata->max_requests = max_qp_wr;
>
> should we
> cdata->max_request = max_qp_wr - RPCRDMA_BACKWARD_WRS?
cdata->max_request is an input parameter to rpcrdma_ep_create().
We can’t simply overwrite it here with a new larger value.
>> ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
>> ep->rep_attr.qp_context = ep;
>> ep->rep_attr.srq = NULL;
>> ep->rep_attr.cap.max_send_wr = cdata->max_requests;
>> + ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
>
> Looks like will cause a qp-create failure if any hypothetical device
> supports devattr->max_qp_wr = cdata->max_requests
We’ve already capped cdata->max_requests at
“devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS” above. So, the logic
should prevent that, unless I’ve made a mistake.
>> rc = ia->ri_ops->ro_open(ia, ep, cdata);
>> if (rc)
>> return rc;
>> ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
>> + ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
>> ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
>> ep->rep_attr.cap.max_recv_sge = 1;
>> ep->rep_attr.cap.max_inline_data = 0;
>> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
>> index 2ca0567..37d0d7f 100644
>> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
>> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
>> @@ -101,6 +101,16 @@ struct rpcrdma_ep {
>> */
>> #define RPCRDMA_IGNORE_COMPLETION (0ULL)
>>
>> +/* Pre-allocate extra Work Requests for handling backward receives
>> + * and sends. This is a fixed value because the Work Queues are
>> + * allocated when the forward channel is set up.
>> + */
>> +#if defined(CONFIG_SUNRPC_BACKCHANNEL)
>> +#define RPCRDMA_BACKWARD_WRS (8)
>> +#else
>> +#define RPCRDMA_BACKWARD_WRS (0)
>> +#endif
>> +
>> /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
>> *
>> * The below structure appears at the front of a large region of kmalloc'd
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
—
Chuck Lever
> On Sep 20, 2015, at 3:52 AM, Sagi Grimberg <[email protected]> wrote:
>
> On 9/17/2015 11:44 PM, Chuck Lever wrote:
>> The rb_send_bufs and rb_recv_bufs arrays are used to implement a
>> pair of stacks for keeping track of free rpcrdma_req and rpcrdma_rep
>> structs. Replace those arrays with free lists.
>>
>> To allow more than 512 RPCs in-flight at once, each of these arrays
>> would be larger than a page (assuming 8-byte addresses and 4KB
>> pages). Allowing up to 64K in-flight RPCs (as TCP now does), each
>> buffer array would have to be 128 pages. That's an order-6
>> allocation. (Not that we're going there.)
>>
>> A list is easier to expand dynamically. Instead of allocating a
>> larger array of pointers and copying the existing pointers to the
>> new array, simply append more buffers to each list.
>>
>> This also makes it simpler to manage receive buffers that might
>> catch backwards-direction calls, or to post receive buffers in
>> bulk to amortize the overhead of ib_post_recv.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>
> Hi Chuck,
>
> I get the idea of this patch, but it is a bit confusing (to a
> non-educated reader).
OK, let’s see if there’s room for additional improvement.
> Can you explain why sometimes you call put/get_locked routines
> and sometimes you open-code them?
Are you talking about the later patch that adds support for
receiving backwards calls? That probably should use the
existing helpers, shouldn’t it.
> And is it mandatory to have
> the callers lock before calling get/put? Perhaps the code would
> be simpler if the get/put routines would take care of locking
> since rb_lock looks dedicated to them.
Not sure I understand this comment, I thought that the helpers
were already doing the locking.
>
>> ---
>> net/sunrpc/xprtrdma/verbs.c | 141 +++++++++++++++++----------------------
>> net/sunrpc/xprtrdma/xprt_rdma.h | 9 +-
>> 2 files changed, 66 insertions(+), 84 deletions(-)
>>
>> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
>> index ac1345b..8d99214 100644
>> --- a/net/sunrpc/xprtrdma/verbs.c
>> +++ b/net/sunrpc/xprtrdma/verbs.c
>> @@ -962,44 +962,18 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>> {
>> struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
>> struct rpcrdma_ia *ia = &r_xprt->rx_ia;
>> - struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
>> - char *p;
>> - size_t len;
>> int i, rc;
>>
>> - buf->rb_max_requests = cdata->max_requests;
>> + buf->rb_max_requests = r_xprt->rx_data.max_requests;
>> spin_lock_init(&buf->rb_lock);
>>
>> - /* Need to allocate:
>> - * 1. arrays for send and recv pointers
>> - * 2. arrays of struct rpcrdma_req to fill in pointers
>> - * 3. array of struct rpcrdma_rep for replies
>> - * Send/recv buffers in req/rep need to be registered
>> - */
>> - len = buf->rb_max_requests *
>> - (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
>> -
>> - p = kzalloc(len, GFP_KERNEL);
>> - if (p == NULL) {
>> - dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
>> - __func__, len);
>> - rc = -ENOMEM;
>> - goto out;
>> - }
>> - buf->rb_pool = p; /* for freeing it later */
>> -
>> - buf->rb_send_bufs = (struct rpcrdma_req **) p;
>> - p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
>> - buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
>> - p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
>> -
>> rc = ia->ri_ops->ro_init(r_xprt);
>> if (rc)
>> goto out;
>>
>> + INIT_LIST_HEAD(&buf->rb_send_bufs);
>> for (i = 0; i < buf->rb_max_requests; i++) {
>> struct rpcrdma_req *req;
>> - struct rpcrdma_rep *rep;
>>
>> req = rpcrdma_create_req(r_xprt);
>> if (IS_ERR(req)) {
>> @@ -1008,7 +982,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>> rc = PTR_ERR(req);
>> goto out;
>> }
>> - buf->rb_send_bufs[i] = req;
>> + list_add(&req->rl_free, &buf->rb_send_bufs);
>> + }
>> +
>> + INIT_LIST_HEAD(&buf->rb_recv_bufs);
>> + for (i = 0; i < buf->rb_max_requests + 2; i++) {
>> + struct rpcrdma_rep *rep;
>>
>> rep = rpcrdma_create_rep(r_xprt);
>> if (IS_ERR(rep)) {
>> @@ -1017,7 +996,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>> rc = PTR_ERR(rep);
>> goto out;
>> }
>> - buf->rb_recv_bufs[i] = rep;
>> + list_add(&rep->rr_list, &buf->rb_recv_bufs);
>> }
>>
>> return 0;
>> @@ -1051,25 +1030,26 @@ void
>> rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
>> {
>> struct rpcrdma_ia *ia = rdmab_to_ia(buf);
>> - int i;
>>
>> - /* clean up in reverse order from create
>> - * 1. recv mr memory (mr free, then kfree)
>> - * 2. send mr memory (mr free, then kfree)
>> - * 3. MWs
>> - */
>> - dprintk("RPC: %s: entering\n", __func__);
>> + while (!list_empty(&buf->rb_recv_bufs)) {
>> + struct rpcrdma_rep *rep = list_entry(buf->rb_recv_bufs.next,
>> + struct rpcrdma_rep,
>> + rr_list);
>>
>> - for (i = 0; i < buf->rb_max_requests; i++) {
>> - if (buf->rb_recv_bufs)
>> - rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
>> - if (buf->rb_send_bufs)
>> - rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
>> + list_del(&rep->rr_list);
>> + rpcrdma_destroy_rep(ia, rep);
>> }
>>
>> - ia->ri_ops->ro_destroy(buf);
>> + while (!list_empty(&buf->rb_send_bufs)) {
>> + struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next,
>> + struct rpcrdma_req,
>> + rl_free);
>>
>> - kfree(buf->rb_pool);
>> + list_del(&req->rl_free);
>> + rpcrdma_destroy_req(ia, req);
>> + }
>> +
>> + ia->ri_ops->ro_destroy(buf);
>> }
>>
>> struct rpcrdma_mw *
>> @@ -1102,24 +1082,27 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
>> }
>>
>> static void
>> -rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
>> +rpcrdma_buffer_put_locked(struct rpcrdma_rep *rep, struct rpcrdma_buffer *buf)
>> {
>> - buf->rb_send_bufs[--buf->rb_send_index] = req;
>> - req->rl_niovs = 0;
>> - if (req->rl_reply) {
>> - buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
>> - req->rl_reply = NULL;
>> - }
>> + list_add_tail(&rep->rr_list, &buf->rb_recv_bufs);
>> +}
>> +
>> +static struct rpcrdma_rep *
>> +rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf)
>> +{
>> + struct rpcrdma_rep *rep;
>> +
>> + rep = list_first_entry(&buf->rb_recv_bufs,
>> + struct rpcrdma_rep, rr_list);
>> + list_del(&rep->rr_list);
>> +
>> + return rep;
>> }
>
> There seems to be a distinction between send/recv buffers. Would it
> make sense to have a symmetric handling for both send/recv buffers?
Or maybe the same helpers could handle both. I’ll have a look
when I get back from SNIA SDC.
>> /*
>> * Get a set of request/reply buffers.
>> *
>> - * Reply buffer (if needed) is attached to send buffer upon return.
>> - * Rule:
>> - * rb_send_index and rb_recv_index MUST always be pointing to the
>> - * *next* available buffer (non-NULL). They are incremented after
>> - * removing buffers, and decremented *before* returning them.
>> + * Reply buffer (if available) is attached to send buffer upon return.
>> */
>> struct rpcrdma_req *
>> rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
>> @@ -1129,25 +1112,22 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
>>
>> spin_lock_irqsave(&buffers->rb_lock, flags);
>>
>> - if (buffers->rb_send_index == buffers->rb_max_requests) {
>> + if (list_empty(&buffers->rb_send_bufs)) {
>> spin_unlock_irqrestore(&buffers->rb_lock, flags);
>> - dprintk("RPC: %s: out of request buffers\n", __func__);
>> - return ((struct rpcrdma_req *)NULL);
>> - }
>> -
>> - req = buffers->rb_send_bufs[buffers->rb_send_index];
>> - if (buffers->rb_send_index < buffers->rb_recv_index) {
>> - dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
>> - __func__,
>> - buffers->rb_recv_index - buffers->rb_send_index);
>> - req->rl_reply = NULL;
>> - } else {
>> - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
>> - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
>> + pr_warn("RPC: %s: out of request buffers\n", __func__);
>> + return NULL;
>> }
>> - buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
>> + req = list_first_entry(&buffers->rb_send_bufs,
>> + struct rpcrdma_req, rl_free);
>> + list_del(&req->rl_free);
>>
>> + req->rl_reply = NULL;
>> + if (!list_empty(&buffers->rb_recv_bufs))
>> + req->rl_reply = rpcrdma_buffer_get_locked(buffers);
>
> Would it make sense to check !list_empty() inside _get_locked and handle
> a possible NULL return?
>
>> spin_unlock_irqrestore(&buffers->rb_lock, flags);
>> +
>> + if (!req->rl_reply)
>> + pr_warn("RPC: %s: out of reply buffers\n", __func__);
>> return req;
>> }
>>
>> @@ -1159,17 +1139,22 @@ void
>> rpcrdma_buffer_put(struct rpcrdma_req *req)
>> {
>> struct rpcrdma_buffer *buffers = req->rl_buffer;
>> + struct rpcrdma_rep *rep = req->rl_reply;
>> unsigned long flags;
>>
>> + req->rl_niovs = 0;
>> + req->rl_reply = NULL;
>> +
>> spin_lock_irqsave(&buffers->rb_lock, flags);
>> - rpcrdma_buffer_put_sendbuf(req, buffers);
>> + list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
>> + if (rep)
>> + rpcrdma_buffer_put_locked(rep, buffers);
>> spin_unlock_irqrestore(&buffers->rb_lock, flags);
>> }
>>
>> /*
>> * Recover reply buffers from pool.
>> - * This happens when recovering from error conditions.
>> - * Post-increment counter/array index.
>> + * This happens when recovering from disconnect.
>> */
>> void
>> rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
>> @@ -1178,10 +1163,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
>> unsigned long flags;
>>
>> spin_lock_irqsave(&buffers->rb_lock, flags);
>> - if (buffers->rb_recv_index < buffers->rb_max_requests) {
>> - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
>> - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
>> - }
>> + if (!list_empty(&buffers->rb_recv_bufs))
>> + req->rl_reply = rpcrdma_buffer_get_locked(buffers);
>> spin_unlock_irqrestore(&buffers->rb_lock, flags);
>> }
>>
>> @@ -1196,7 +1179,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
>> unsigned long flags;
>>
>> spin_lock_irqsave(&buffers->rb_lock, flags);
>> - buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
>> + rpcrdma_buffer_put_locked(rep, buffers);
>> spin_unlock_irqrestore(&buffers->rb_lock, flags);
>> }
>>
>> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
>> index a13508b..e6a358f 100644
>> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
>> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
>> @@ -252,6 +252,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
>> #define RPCRDMA_MAX_IOVS (2)
>>
>> struct rpcrdma_req {
>> + struct list_head rl_free;
>> unsigned int rl_niovs;
>> unsigned int rl_nchunks;
>> unsigned int rl_connect_cookie;
>> @@ -285,12 +286,10 @@ struct rpcrdma_buffer {
>> struct list_head rb_all;
>> char *rb_pool;
>>
>> - spinlock_t rb_lock; /* protect buf arrays */
>> + spinlock_t rb_lock; /* protect buf lists */
>> + struct list_head rb_send_bufs;
>> + struct list_head rb_recv_bufs;
>> u32 rb_max_requests;
>> - int rb_send_index;
>> - int rb_recv_index;
>> - struct rpcrdma_req **rb_send_bufs;
>> - struct rpcrdma_rep **rb_recv_bufs;
>> };
>> #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
>>
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
—
Chuck Lever
On Mon, Sep 21, 2015 at 9:15 PM, Chuck Lever <[email protected]> wrote:
>
>> On Sep 21, 2015, at 1:51 AM, Devesh Sharma <[email protected]> wrote:
>>
>> On Sun, Sep 20, 2015 at 4:05 PM, Sagi Grimberg <[email protected]> wrote:
>>>>> It is possible that in a given poll_cq
>>>>> call you end up getting on 1 completion, the other completion is
>>>>> delayed due to some reason.
>>>>
>>>>
>>>> If a CQE is allowed to be delayed, how does polling
>>>> again guarantee that the consumer can retrieve it?
>>>>
>>>> What happens if a signal occurs, there is only one CQE,
>>>> but it is delayed? ib_poll_cq would return 0 in that
>>>> case, and the consumer would never call again, thinking
>>>> the CQ is empty. There's no way the consumer can know
>>>> for sure when a CQ is drained.
>>>>
>>>> If the delayed CQE happens only when there is more
>>>> than one CQE, how can polling multiple WCs ever work
>>>> reliably?
>>>>
>>>> Maybe I don't understand what is meant by delayed.
>>>>
>>>
>>> If I'm not mistaken, Devesh meant that if between ib_poll_cq (where you
>>> polled the last 2 wcs) until the while statement another CQE was
>>> generated then you lost a bit of efficiency. Correct?
>>
>> Yes, That's the point.
>
> I’m optimizing for the common case where 1 CQE is ready
> to be polled. How much of an efficiency loss are you
> talking about, how often would this loss occur, and is
> this a problem for all providers / devices?
The scenario would happen or not is difficult to predict, but its
quite possible with any vendor based on load on PCI bus I guess.
This may affect the latency figures though.
>
> Is this an issue for the current arrangement where 8 WCs
> are polled at a time?
Yes, its there even today.
>
>
> —
> Chuck Lever
>
>
>
> On Sep 22, 2015, at 1:32 PM, Devesh Sharma <[email protected]> wrote:
>
> On Mon, Sep 21, 2015 at 9:15 PM, Chuck Lever <[email protected]> wrote:
>>
>>> On Sep 21, 2015, at 1:51 AM, Devesh Sharma <[email protected]> wrote:
>>>
>>> On Sun, Sep 20, 2015 at 4:05 PM, Sagi Grimberg <[email protected]> wrote:
>>>>>> It is possible that in a given poll_cq
>>>>>> call you end up getting on 1 completion, the other completion is
>>>>>> delayed due to some reason.
>>>>>
>>>>>
>>>>> If a CQE is allowed to be delayed, how does polling
>>>>> again guarantee that the consumer can retrieve it?
>>>>>
>>>>> What happens if a signal occurs, there is only one CQE,
>>>>> but it is delayed? ib_poll_cq would return 0 in that
>>>>> case, and the consumer would never call again, thinking
>>>>> the CQ is empty. There's no way the consumer can know
>>>>> for sure when a CQ is drained.
>>>>>
>>>>> If the delayed CQE happens only when there is more
>>>>> than one CQE, how can polling multiple WCs ever work
>>>>> reliably?
>>>>>
>>>>> Maybe I don't understand what is meant by delayed.
>>>>>
>>>>
>>>> If I'm not mistaken, Devesh meant that if between ib_poll_cq (where you
>>>> polled the last 2 wcs) until the while statement another CQE was
>>>> generated then you lost a bit of efficiency. Correct?
>>>
>>> Yes, That's the point.
>>
>> I’m optimizing for the common case where 1 CQE is ready
>> to be polled. How much of an efficiency loss are you
>> talking about, how often would this loss occur, and is
>> this a problem for all providers / devices?
>
> The scenario would happen or not is difficult to predict, but its
> quite possible with any vendor based on load on PCI bus I guess.
> This may affect the latency figures though.
>
>>
>> Is this an issue for the current arrangement where 8 WCs
>> are polled at a time?
>
> Yes, its there even today.
This review comment does not feel closed yet. Maybe it’s
because I don’t understand exactly what the issue is.
Is this the problem that REPORT_MISSED_EVENTS is supposed to
resolve?
A missed WC will result in an RPC/RDMA transport deadlock. In
fact that is the reason for this particular patch (although
it addresses only one source of missed WCs). So I would like
to see that there are no windows here.
I’ve been told the only sure way to address this for every
provider is to use the classic but inefficient mechanism of
poll one WC at a time until no WC is returned; re-arm; poll
again until no WC is returned.
In the common case this means two extra poll_cq calls that
return nothing. So I claim the current status quo isn’t
good enough :-)
Doug and others have suggested the best place to address
problems with missed WC signals is in the drivers. All of
them should live up to the ib_poll_cq() API contract the
same way. In addition I’d really like to see
- polling and arming work without having to perform extra
unneeded locking of the CQ, and
- polling arrays work without introducing races
Can we have that discussion now, since there is already
some discussion of IB core API fix-ups?
—
Chuck Lever
On Thu, Oct 01, 2015 at 12:37:36PM -0400, Chuck Lever wrote:
> A missed WC will result in an RPC/RDMA transport deadlock. In
> fact that is the reason for this particular patch (although
> it addresses only one source of missed WCs). So I would like
> to see that there are no windows here.
WCs are never missed.
The issue is a race where re-arming the CQ might not work, meaning you
don't get an event.
You can certainly use arrays with poll_cq. There is no race in the API
here.
But you have to use the IB_CQ_REPORT_MISSED_EVENTS scheme to guarantee
the CQ is actually armed or continue to loop again.
Basically you have to loop until ib_req_notify_cq succeeds.
Any driver that doesn't support this is broken, do we know of any?
while (1) {
struct ib_wc wcs[100];
int rc = ib_poll_cq(cw, NELEMS(wcs), wcs);
.. process rc wcs ..
if (rc != NELEMS(wcs))
if (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
IB_CQ_REPORT_MISSED_EVENTS) == 0)
break;
}
API wise, we should probably look at forcing
IB_CQ_REPORT_MISSED_EVENTS on and dropping the flag.
Jason
> On Oct 1, 2015, at 1:13 PM, Jason Gunthorpe <[email protected]> wrote:
>
> On Thu, Oct 01, 2015 at 12:37:36PM -0400, Chuck Lever wrote:
>
>> A missed WC will result in an RPC/RDMA transport deadlock. In
>> fact that is the reason for this particular patch (although
>> it addresses only one source of missed WCs). So I would like
>> to see that there are no windows here.
>
> WCs are never missed.
The review comment earlier in this thread suggested there is
a race condition where a WC can be “delayed” resulting in,
well, I’m still not certain what the consequences are.
There was some interest in going back to a single WC array
argument when calling ib_poll_cq. I’d like to avoid that.
If there is an issue using a WC array, I’d like to
understand what it is.
> The issue is a race where re-arming the CQ might not work, meaning you
> don't get an event.
>
> You can certainly use arrays with poll_cq. There is no race in the API
> here.
>
> But you have to use the IB_CQ_REPORT_MISSED_EVENTS scheme to guarantee
> the CQ is actually armed or continue to loop again.
>
> Basically you have to loop until ib_req_notify_cq succeeds.
>
> Any driver that doesn't support this is broken, do we know of any?
>
> while (1) {
> struct ib_wc wcs[100];
> int rc = ib_poll_cq(cw, NELEMS(wcs), wcs);
>
> .. process rc wcs ..
>
> if (rc != NELEMS(wcs))
> if (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
> IB_CQ_REPORT_MISSED_EVENTS) == 0)
> break;
> }
>
> API wise, we should probably look at forcing
> IB_CQ_REPORT_MISSED_EVENTS on and dropping the flag.
It’s been suggested that it’s not clear what a positive
return value from ib_req_notify_cq() means when the
REPORT_MISSED_EVENTS flags is set: does it mean that
the CQ has been re-armed? I had assumed that a positive
RC meant both missed events and a successful re-arm,
but the pseudo-code above suggests that is not the
case.
—
Chuck Lever
On Thu, Oct 01, 2015 at 01:36:26PM -0400, Chuck Lever wrote:
> >> A missed WC will result in an RPC/RDMA transport deadlock. In
> >> fact that is the reason for this particular patch (although
> >> it addresses only one source of missed WCs). So I would like
> >> to see that there are no windows here.
> >
> > WCs are never missed.
>
> The review comment earlier in this thread suggested there is
> a race condition where a WC can be “delayed” resulting in,
> well, I’m still not certain what the consequences are.
Yes. The consequence would typically be lockup of CQ processing.
> > while (1) {
> > struct ib_wc wcs[100];
> > int rc = ib_poll_cq(cw, NELEMS(wcs), wcs);
> >
> > .. process rc wcs ..
> >
> > if (rc != NELEMS(wcs))
> > if (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
> > IB_CQ_REPORT_MISSED_EVENTS) == 0)
> > break;
> > }
> >
> > API wise, we should probably look at forcing
> > IB_CQ_REPORT_MISSED_EVENTS on and dropping the flag.
>
> It’s been suggested that it’s not clear what a positive
> return value from ib_req_notify_cq() means when the
> REPORT_MISSED_EVENTS flags is set: does it mean that
> the CQ has been re-armed? I had assumed that a positive
> RC meant both missed events and a successful re-arm,
> but the pseudo-code above suggests that is not the
> case.
The ULP must assume the CQ has NOT been armed after a positive return.
What the driver does to the arm state is undefined - for instance the
driver may trigger a callback and still return 1 here.
However, the driver must make this guarentee:
If ib_req_notify_cq(IB_CQ_REPORT_MISSED_EVENTS) returns 0 then
the call back will always be called when the CQ is non-empty.
The ULP must loop doing polling until the above happens, otherwise the
event notification may be missed.
ie the above is guarnteed to close the WC delay/lockup race.
Again, if there has been confusion on the driver side, drivers that
don't implement the above are broken.
Review Roland's original commit comments on this feature.
https://github.com/jgunthorpe/linux/commit/ed23a72778f3dbd465e55b06fe31629e7e1dd2f3
I'm not sure where we are at on the 'significant overhead for some
low-level drivers' issue, but assuming that is still the case, then
the recommendation is this:
bool exiting = false;
while (1) {
struct ib_wc wcs[100];
int rc = ib_poll_cq(cq, NELEMS(wcs), wcs);
if (rc == 0 && exiting)
break;
.. process rc wcs ..
if (rc != NELEMS(wcs)) {
ib_req_notify_cq(cq, IB_CQ_NEXT_COMP)
exiting = true;
} else
exiting = false;
}
ie a double poll.
AFAIK, this is a common pattern in the ULPs.. Perhaps we should
implement this as a core API:
struct ib_wc wcs[100];
while ((rc = ib_poll_cq_and_arm(cq, NELEMS(wcs), wcs)) != 0) {
.. process rc wcs ..
ib_poll_cq_and_arm reads wcs off the CQ. If it returns 0 then the
callback is guarenteed to happen when the CQ is non empty.
Jason
> On Oct 1, 2015, at 2:15 PM, Jason Gunthorpe <[email protected]> wrote:
>
> On Thu, Oct 01, 2015 at 01:36:26PM -0400, Chuck Lever wrote:
>
>>>> A missed WC will result in an RPC/RDMA transport deadlock. In
>>>> fact that is the reason for this particular patch (although
>>>> it addresses only one source of missed WCs). So I would like
>>>> to see that there are no windows here.
>>>
>>> WCs are never missed.
>>
>> The review comment earlier in this thread suggested there is
>> a race condition where a WC can be “delayed” resulting in,
>> well, I’m still not certain what the consequences are.
>
> Yes. The consequence would typically be lockup of CQ processing.
>
>>> while (1) {
>>> struct ib_wc wcs[100];
>>> int rc = ib_poll_cq(cw, NELEMS(wcs), wcs);
>>>
>>> .. process rc wcs ..
>>>
>>> if (rc != NELEMS(wcs))
>>> if (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
>>> IB_CQ_REPORT_MISSED_EVENTS) == 0)
>>> break;
>>> }
>>>
>>> API wise, we should probably look at forcing
>>> IB_CQ_REPORT_MISSED_EVENTS on and dropping the flag.
>>
>> It’s been suggested that it’s not clear what a positive
>> return value from ib_req_notify_cq() means when the
>> REPORT_MISSED_EVENTS flags is set: does it mean that
>> the CQ has been re-armed? I had assumed that a positive
>> RC meant both missed events and a successful re-arm,
>> but the pseudo-code above suggests that is not the
>> case.
>
> The ULP must assume the CQ has NOT been armed after a positive return.
OK, I will fix this when I revise 03/18.
> What the driver does to the arm state is undefined - for instance the
> driver may trigger a callback and still return 1 here.
>
> However, the driver must make this guarentee:
>
> If ib_req_notify_cq(IB_CQ_REPORT_MISSED_EVENTS) returns 0 then
> the call back will always be called when the CQ is non-empty.
>
> The ULP must loop doing polling until the above happens, otherwise the
> event notification may be missed.
>
> ie the above is guarnteed to close the WC delay/lockup race.
>
> Again, if there has been confusion on the driver side, drivers that
> don't implement the above are broken.
>
> Review Roland's original commit comments on this feature.
>
> https://github.com/jgunthorpe/linux/commit/ed23a72778f3dbd465e55b06fe31629e7e1dd2f3
>
> I'm not sure where we are at on the 'significant overhead for some
> low-level drivers' issue, but assuming that is still the case, then
> the recommendation is this:
>
> bool exiting = false;
> while (1) {
> struct ib_wc wcs[100];
> int rc = ib_poll_cq(cq, NELEMS(wcs), wcs);
> if (rc == 0 && exiting)
> break;
>
> .. process rc wcs ..
>
> if (rc != NELEMS(wcs)) {
> ib_req_notify_cq(cq, IB_CQ_NEXT_COMP)
> exiting = true;
> } else
> exiting = false;
> }
>
> ie a double poll.
> AFAIK, this is a common pattern in the ULPs.. Perhaps we should
> implement this as a core API:
>
> struct ib_wc wcs[100];
> while ((rc = ib_poll_cq_and_arm(cq, NELEMS(wcs), wcs)) != 0) {
> .. process rc wcs ..
>
> ib_poll_cq_and_arm reads wcs off the CQ. If it returns 0 then the
> callback is guarenteed to happen when the CQ is non empty.
This makes sense to me, especially if it means fewer
times grabbing and releasing the CQ’s spinlock.
Does a ULP want to continue polling if ib_poll_cq{_and_arm)
returns a negative RC?
—
Chuck Lever
On Thu, Oct 01, 2015 at 02:31:41PM -0400, Chuck Lever wrote:
> Does a ULP want to continue polling if ib_poll_cq{_and_arm)
> returns a negative RC?
No. We should try and figure out if that can even happen, if not get
rid of the possibility.
If it can happen, it can only mean the CQ is busted, and needs to be
destroyed. Ie the whole ULP probably needs to restart.
Jason