Return-Path: Received: from mail-it0-f68.google.com ([209.85.214.68]:33187 "EHLO mail-it0-f68.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1755430AbcFGTrK (ORCPT ); Tue, 7 Jun 2016 15:47:10 -0400 Subject: [PATCH v1 06/20] xprtrdma: Refactor MR recovery work queues From: Chuck Lever To: linux-rdma@vger.kernel.org, linux-nfs@vger.kernel.org Date: Tue, 07 Jun 2016 15:47:08 -0400 Message-ID: <20160607194708.18401.79092.stgit@manet.1015granger.net> In-Reply-To: <20160607194001.18401.88592.stgit@manet.1015granger.net> References: <20160607194001.18401.88592.stgit@manet.1015granger.net> MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Sender: linux-nfs-owner@vger.kernel.org List-ID: I found that commit ead3f26e359e ("xprtrdma: Add ro_unmap_safe memreg method"), which introduces ro_unmap_safe, never wired up the FMR recovery worker. The FMR and FRWR recovery work queues both do the same thing. Instead of setting up separate individual work queues for this, schedule a delayed worker to deal with them, since recovering MRs is not performance-critical. Fixes: ead3f26e359e ("xprtrdma: Add ro_unmap_safe memreg method") Signed-off-by: Chuck Lever --- net/sunrpc/xprtrdma/fmr_ops.c | 85 +++++++++++---------------------------- net/sunrpc/xprtrdma/frwr_ops.c | 68 ++++++------------------------- net/sunrpc/xprtrdma/transport.c | 11 ----- net/sunrpc/xprtrdma/verbs.c | 42 +++++++++++++++++++ net/sunrpc/xprtrdma/xprt_rdma.h | 11 +++-- 5 files changed, 87 insertions(+), 130 deletions(-) diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index b581a28..45800c9 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -35,30 +35,6 @@ /* Maximum scatter/gather per FMR */ #define RPCRDMA_MAX_FMR_SGES (64) -static struct workqueue_struct *fmr_recovery_wq; - -#define FMR_RECOVERY_WQ_FLAGS (WQ_UNBOUND) - -int -fmr_alloc_recovery_wq(void) -{ - fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0); - return !fmr_recovery_wq ? -ENOMEM : 0; -} - -void -fmr_destroy_recovery_wq(void) -{ - struct workqueue_struct *wq; - - if (!fmr_recovery_wq) - return; - - wq = fmr_recovery_wq; - fmr_recovery_wq = NULL; - destroy_workqueue(wq); -} - static int __fmr_unmap(struct rpcrdma_mw *mw) { @@ -68,54 +44,30 @@ __fmr_unmap(struct rpcrdma_mw *mw) return ib_unmap_fmr(&l); } -static void -__fmr_dma_unmap(struct rpcrdma_mw *mw) -{ - struct rpcrdma_xprt *r_xprt = mw->mw_xprt; - - ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, - mw->mw_sg, mw->mw_nents, mw->mw_dir); - rpcrdma_put_mw(r_xprt, mw); -} - +/* Reset of a single FMR. + * + * There's no recovery if this fails. The FMR is abandoned, but + * remains in rb_all. It will be cleaned up when the transport is + * destroyed. + */ static void __fmr_reset_and_unmap(struct rpcrdma_mw *mw) { + struct rpcrdma_xprt *r_xprt = mw->mw_xprt; int rc; /* ORDER */ rc = __fmr_unmap(mw); + ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, + mw->mw_sg, mw->mw_nents, mw->mw_dir); if (rc) { pr_warn("rpcrdma: ib_unmap_fmr status %d, fmr %p orphaned\n", rc, mw); return; } - __fmr_dma_unmap(mw); + rpcrdma_put_mw(r_xprt, mw); } -/* Deferred reset of a single FMR. Generate a fresh rkey by - * replacing the MR. There's no recovery if this fails. - */ -static void -__fmr_recovery_worker(struct work_struct *work) -{ - struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw, - mw_work); - - __fmr_reset_and_unmap(mw); - return; -} - -/* A broken MR was discovered in a context that can't sleep. - * Defer recovery to the recovery worker. - */ -static void -__fmr_queue_recovery(struct rpcrdma_mw *mw) -{ - INIT_WORK(&mw->mw_work, __fmr_recovery_worker); - queue_work(fmr_recovery_wq, &mw->mw_work); -} - static void __fmr_release(struct rpcrdma_mw *r) { @@ -142,6 +94,12 @@ fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, return 0; } +static void +fmr_op_recover_mr(struct rpcrdma_mw *mw) +{ + __fmr_reset_and_unmap(mw); +} + /* FMR mode conveys up to 64 pages of payload per chunk segment. */ static size_t @@ -287,7 +245,9 @@ out_maperr: pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n", len, (unsigned long long)dma_pages[0], pageoff, mw->mw_nents, rc); - __fmr_dma_unmap(mw); + ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, + mw->mw_sg, mw->mw_nents, mw->mw_dir); + rpcrdma_put_mw(r_xprt, mw); return rc; } @@ -331,7 +291,9 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) seg = &req->rl_segments[i]; mw = seg->rl_mw; - __fmr_dma_unmap(mw); + ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, + mw->mw_sg, mw->mw_nents, mw->mw_dir); + rpcrdma_put_mw(r_xprt, mw); i += seg->mr_nsegs; seg->mr_nsegs = 0; @@ -359,7 +321,7 @@ fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, if (sync) __fmr_reset_and_unmap(mw); else - __fmr_queue_recovery(mw); + rpcrdma_defer_mr_recovery(mw); i += seg->mr_nsegs; seg->mr_nsegs = 0; @@ -384,6 +346,7 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { .ro_map = fmr_op_map, .ro_unmap_sync = fmr_op_unmap_sync, .ro_unmap_safe = fmr_op_unmap_safe, + .ro_recover_mr = fmr_op_recover_mr, .ro_open = fmr_op_open, .ro_maxpages = fmr_op_maxpages, .ro_init = fmr_op_init, diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index a476fa6..19bddfe 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -73,31 +73,6 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif -static struct workqueue_struct *frwr_recovery_wq; - -#define FRWR_RECOVERY_WQ_FLAGS (WQ_UNBOUND | WQ_MEM_RECLAIM) - -int -frwr_alloc_recovery_wq(void) -{ - frwr_recovery_wq = alloc_workqueue("frwr_recovery", - FRWR_RECOVERY_WQ_FLAGS, 0); - return !frwr_recovery_wq ? -ENOMEM : 0; -} - -void -frwr_destroy_recovery_wq(void) -{ - struct workqueue_struct *wq; - - if (!frwr_recovery_wq) - return; - - wq = frwr_recovery_wq; - frwr_recovery_wq = NULL; - destroy_workqueue(wq); -} - static int __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) { @@ -124,6 +99,12 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) return 0; } +/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR. + * + * There's no recovery if this fails. The FRMR is abandoned, but + * remains in rb_all. It will be cleaned up when the transport is + * destroyed. + */ static void __frwr_reset_and_unmap(struct rpcrdma_mw *mw) { @@ -138,30 +119,10 @@ __frwr_reset_and_unmap(struct rpcrdma_mw *mw) rpcrdma_put_mw(r_xprt, mw); } -/* Deferred reset of a single FRMR. Generate a fresh rkey by - * replacing the MR. - * - * There's no recovery if this fails. The FRMR is abandoned, but - * remains in rb_all. It will be cleaned up when the transport is - * destroyed. - */ -static void -__frwr_recovery_worker(struct work_struct *work) -{ - struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw, - mw_work); - - __frwr_reset_and_unmap(r); -} - -/* A broken MR was discovered in a context that can't sleep. - * Defer recovery to the recovery worker. - */ static void -__frwr_queue_recovery(struct rpcrdma_mw *r) +frwr_op_recover_mr(struct rpcrdma_mw *mw) { - INIT_WORK(&r->mw_work, __frwr_recovery_worker); - queue_work(frwr_recovery_wq, &r->mw_work); + __frwr_reset_and_unmap(mw); } static int @@ -399,7 +360,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, seg1->rl_mw = NULL; do { if (mw) - __frwr_queue_recovery(mw); + rpcrdma_defer_mr_recovery(mw); mw = rpcrdma_get_mw(r_xprt); if (!mw) return -ENOMEM; @@ -481,12 +442,11 @@ out_mapmr_err: pr_err("rpcrdma: failed to register mr %p (%u/%u)\n", frmr->fr_mr, n, mw->mw_nents); rc = n < 0 ? n : -EIO; - __frwr_queue_recovery(mw); + rpcrdma_defer_mr_recovery(mw); return rc; out_senderr: - pr_err("rpcrdma: ib_post_send status %i\n", rc); - __frwr_queue_recovery(mw); + rpcrdma_defer_mr_recovery(mw); return rc; } @@ -627,7 +587,7 @@ frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, if (sync) __frwr_reset_and_unmap(mw); else - __frwr_queue_recovery(mw); + rpcrdma_defer_mr_recovery(mw); i += seg->mr_nsegs; seg->mr_nsegs = 0; @@ -640,9 +600,6 @@ frwr_op_destroy(struct rpcrdma_buffer *buf) { struct rpcrdma_mw *r; - /* Ensure stale MWs for "buf" are no longer in flight */ - flush_workqueue(frwr_recovery_wq); - while (!list_empty(&buf->rb_all)) { r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); list_del(&r->mw_all); @@ -655,6 +612,7 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { .ro_map = frwr_op_map, .ro_unmap_sync = frwr_op_unmap_sync, .ro_unmap_safe = frwr_op_unmap_safe, + .ro_recover_mr = frwr_op_recover_mr, .ro_open = frwr_op_open, .ro_maxpages = frwr_op_maxpages, .ro_init = frwr_op_init, diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 99d2e5b..29c91bf 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -741,7 +741,6 @@ void xprt_rdma_cleanup(void) __func__, rc); rpcrdma_destroy_wq(); - frwr_destroy_recovery_wq(); rc = xprt_unregister_transport(&xprt_rdma_bc); if (rc) @@ -753,20 +752,13 @@ int xprt_rdma_init(void) { int rc; - rc = frwr_alloc_recovery_wq(); - if (rc) - return rc; - rc = rpcrdma_alloc_wq(); - if (rc) { - frwr_destroy_recovery_wq(); + if (rc) return rc; - } rc = xprt_register_transport(&xprt_rdma); if (rc) { rpcrdma_destroy_wq(); - frwr_destroy_recovery_wq(); return rc; } @@ -774,7 +766,6 @@ int xprt_rdma_init(void) if (rc) { xprt_unregister_transport(&xprt_rdma); rpcrdma_destroy_wq(); - frwr_destroy_recovery_wq(); return rc; } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 4f7e1ba..e9f7f727 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -762,6 +762,41 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ib_drain_qp(ia->ri_id->qp); } +static void +rpcrdma_mr_recovery_worker(struct work_struct *work) +{ + struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, + rb_recovery_worker.work); + struct rpcrdma_mw *mw; + + spin_lock(&buf->rb_recovery_lock); + while (!list_empty(&buf->rb_stale_mrs)) { + mw = list_first_entry(&buf->rb_stale_mrs, + struct rpcrdma_mw, mw_list); + list_del_init(&mw->mw_list); + spin_unlock(&buf->rb_recovery_lock); + + dprintk("RPC: %s: recovering MR %p\n", __func__, mw); + mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); + + spin_lock(&buf->rb_recovery_lock); + }; + spin_unlock(&buf->rb_recovery_lock); +} + +void +rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) +{ + struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + + spin_lock(&buf->rb_recovery_lock); + list_add(&mw->mw_list, &buf->rb_stale_mrs); + spin_unlock(&buf->rb_recovery_lock); + + schedule_delayed_work(&buf->rb_recovery_worker, 0); +} + struct rpcrdma_req * rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) { @@ -863,6 +898,11 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) list_add(&rep->rr_list, &buf->rb_recv_bufs); } + INIT_DELAYED_WORK(&buf->rb_recovery_worker, + rpcrdma_mr_recovery_worker); + spin_lock_init(&buf->rb_recovery_lock); + INIT_LIST_HEAD(&buf->rb_stale_mrs); + return 0; out: rpcrdma_buffer_destroy(buf); @@ -911,6 +951,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { struct rpcrdma_ia *ia = rdmab_to_ia(buf); + cancel_delayed_work_sync(&buf->rb_recovery_worker); + while (!list_empty(&buf->rb_recv_bufs)) { struct rpcrdma_rep *rep; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 3472844..8eb02ab 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -244,7 +244,6 @@ struct rpcrdma_mw { struct rpcrdma_fmr fmr; struct rpcrdma_frmr frmr; }; - struct work_struct mw_work; struct rpcrdma_xprt *mw_xprt; struct list_head mw_all; }; @@ -336,6 +335,10 @@ struct rpcrdma_buffer { struct list_head rb_allreqs; u32 rb_bc_max_requests; + + spinlock_t rb_recovery_lock; + struct list_head rb_stale_mrs; + struct delayed_work rb_recovery_worker; }; #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) @@ -395,6 +398,7 @@ struct rpcrdma_memreg_ops { struct rpcrdma_req *); void (*ro_unmap_safe)(struct rpcrdma_xprt *, struct rpcrdma_req *, bool); + void (*ro_recover_mr)(struct rpcrdma_mw *); int (*ro_open)(struct rpcrdma_ia *, struct rpcrdma_ep *, struct rpcrdma_create_data_internal *); @@ -471,6 +475,8 @@ void rpcrdma_buffer_put(struct rpcrdma_req *); void rpcrdma_recv_buffer_get(struct rpcrdma_req *); void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); +void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *); + struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *, size_t, gfp_t); void rpcrdma_free_regbuf(struct rpcrdma_ia *, @@ -478,9 +484,6 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *, int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); -int frwr_alloc_recovery_wq(void); -void frwr_destroy_recovery_wq(void); - int rpcrdma_alloc_wq(void); void rpcrdma_destroy_wq(void);