Subject: [PATCH for-next v3 0/7] On-Demand Paging on SoftRoCE

This patch series implements the On-Demand Paging feature on SoftRoCE(rxe)
driver, which has been available only in mlx5 driver[1] so far.

As an important change, it is necessary to convert triple tasklets
(requester, responder and completer) to workqueue because they must be
able to sleep in order to trigger page fault before accessing MRs. There
have been some discussions about this, and Bob Pearson posted a patchset[2]
for the conversion. I found it causes soft lockup and reported to him on
18th Nov. However, there has been no progress since then. The issue has
been a blocker to this ODP patchset for more than three months since the
first post of the RFC patchset.

As his patchset, consisting of 13 patches, is too large and complicated to
find the cause of soft lockup, I have prepared a patch that realizes the
conversion without the problem. I tried to minimize changes so that he can
easily make the changes he originally intended upon it.

[Overview]
When applications register a memory region(MR), RDMA drivers normally pin
pages in the MR so that physical addresses are never changed during RDMA
communication. This requires the MR to fit in physical memory and
inevitably leads to memory pressure. On the other hand, On-Demand Paging
(ODP) allows applications to register MRs without pinning pages. They are
paged-in when the driver requires and paged-out when the OS reclaims. As a
result, it is possible to register a large MR that does not fit in physical
memory without taking up so much physical memory.

[Why to add this feature?]
We, Fujitsu, have contributed to RDMA with a view to using it with
persistent memory. Persistent memory can host a filesystem that allows
applications to read/write files directly without involving page cache.
This is called FS-DAX(filesystem direct access) mode. There is a problem
that data on DAX-enabled filesystem cannot be duplicated with software RAID
or other hardware methods. Data replication with RDMA, which features
high-speed connections, is the best solution for the problem.

However, there is a known issue that hinders using RDMA with FS-DAX. When
RDMA operations to a file and update of the file metadata are processed
concurrently on the same node, illegal memory accesses can be executed,
disregarding the updated metadata. This is because RDMA operations do not
go through page cache but access data directly. There was an effort[3] to
solve this problem, but it was rejected in the end. Though there is no
general solution available, it is possible to work around the problem using
the ODP feature. It enables the kernel driver to update metadata before
processing RDMA operations.

We have enhanced the rxe to expedite the usage of persistent memory. Our
contribution to rxe includes RDMA Atomic write[4] and RDMA Flush[5]. With
them being merged along with ODP, an environment will be ready for
developers to create and test software for RDMA with FS-DAX. There is a
library(librpma)[6] being developed for this purpose. This environment can
be used by anybody without any special hardware but an ordinary computer
with a normal NIC though it is inferior to hardware implementations in
terms of performance.

[Design considerations]
ODP has been available only in mlx5, but functions and data structures
that can be used commonly are provided in ib_uverbs(infiniband/core). The
interfaces are heavily dependent on HMM infrastructure[7], and this
patchset use them as much as possible. While mlx5 has both Explicit and
Implicit ODP features along with prefetch feature, this patchset implements
the Explicit ODP feature only.

As responder and completer sleep, it is more likely that packet drop occurs
because of overflow in receiver queue. There are multiple queues involved,
but, as SoftRoCE uses UDP, the most important one would be the UDP buffers.
The size can be configured in net.core.rmem_default and net.core.rmem_max
sysconfig parameters. Users should change these values in case of packet
drop, but page fault would be typically not so long as to cause the
problem.

[How does ODP work?]
"struct ib_umem_odp" is used to manage pages. It is created for each
ODP-enabled MR on its registration. This struct holds a pair of arrays
(dma_list/pfn_list) that serve as a driver page table. DMA addresses and
PFNs are stored in the driver page table. They are updated on page-in and
page-out, both of which use the common interfaces in the ib_uverbs layer.

Page-in can occur when requester, responder or completer access an MR in
order to process RDMA operations. If they find that the pages being
accessed are not present on physical memory or requisite permissions are
not set on the pages, they provoke page fault to make pages present with
proper permissions and at the same time update the driver page table. After
confirming the presence of the pages, they execute memory access such as
read, write or atomic operations.

Page-out is triggered by page reclaim or filesystem events (e.g. metadata
update of a file that is being used as an MR). When creating an ODP-enabled
MR, the driver registers an MMU notifier callback. When the kernel issues a
page invalidation notification, the callback is provoked to unmap DMA
addresses and update the driver page table. After that, the kernel releases
the pages.

[Supported operations]
All traditional operations are supported on RC connection. The new Atomic
write[4] and RDMA Flush[5] operations are not included in this patchset. I
shall post them later after this patchset is merged. On UD connection,
Send, Recv, and SRQ-Recv are supported.

[How to test ODP?]
There are only a few resources available for testing. pyverbs testcases in
rdma-core and perftest[8] are recommendable ones. Other than them, the
ibv_rc_pingpong command can also used for testing. Note that you may have
to build perftest from upstream because older versions do not handle ODP
capabilities correctly.

The tree is available from the URL below:
https://github.com/daimatsuda/linux/tree/odp_v3

[Future work]
My next work is to enable the new Atomic write[4] and RDMA Flush[5]
operations with ODP. After that, I shall implement the prefetch feature. It
allows applications to trigger page fault using ibv_advise_mr(3) to
optimize performance. Some existing software like librpma[6] use this
feature. Additionally, I think we can also add the implicit ODP feature in
the future.


[1] [RFC 00/20] On demand paging
https://www.spinics.net/lists/linux-rdma/msg18906.html

[2] [PATCH for-next v3 00/13] Implement work queues for rdma_rxe
https://lore.kernel.org/linux-rdma/[email protected]/

[3] [RFC PATCH v2 00/19] RDMA/FS DAX truncate proposal V1,000,002 ;-)
https://lore.kernel.org/nvdimm/[email protected]/

[4] [PATCH v7 0/8] RDMA/rxe: Add atomic write operation
https://lore.kernel.org/linux-rdma/[email protected]/

[5] [for-next PATCH 00/10] RDMA/rxe: Add RDMA FLUSH operation
https://lore.kernel.org/lkml/[email protected]/

[6] librpma: Remote Persistent Memory Access Library
https://github.com/pmem/rpma

[7] Heterogeneous Memory Management (HMM)
https://www.kernel.org/doc/html/latest/mm/hmm.html

[8] linux-rdma/perftest: Infiniband Verbs Performance Tests
https://github.com/linux-rdma/perftest

[9] tests: ODP testcases for RDMA Write/Read and Atomic operations #1229
https://github.com/linux-rdma/rdma-core/pull/1229

v2->v3:
1) Removed a patch that changes the common ib_uverbs layer.
2) Re-implemented patches for conversion to workqueue.
3) Fixed compile errors (happened when CONFIG_INFINIBAND_ON_DEMAND_PAGING=n).
4) Fixed some functions that returned incorrect errors.
5) Temporarily disabled ODP for RDMA Flush and Atomic Write.

v1->v2:
1) Fixed a crash issue reported by Haris Iqbal.
2) Tried to make lock patters clearer as pointed out by Romanovsky.
3) Minor clean ups and fixes.

Daisuke Matsuda (7):
RDMA/rxe: Convert triple tasklets to use workqueue
RDMA/rxe: Always schedule works before accessing user MRs
RDMA/rxe: Cleanup code for responder Atomic operations
RDMA/rxe: Add page invalidation support
RDMA/rxe: Allow registering MRs for On-Demand Paging
RDMA/rxe: Add support for Send/Recv/Write/Read operations with ODP
RDMA/rxe: Add support for the traditional Atomic operations with ODP

drivers/infiniband/sw/rxe/Makefile | 2 +
drivers/infiniband/sw/rxe/rxe.c | 27 +-
drivers/infiniband/sw/rxe/rxe_comp.c | 22 +-
drivers/infiniband/sw/rxe/rxe_loc.h | 36 ++-
drivers/infiniband/sw/rxe/rxe_mr.c | 7 +-
drivers/infiniband/sw/rxe/rxe_odp.c | 339 ++++++++++++++++++++++++++
drivers/infiniband/sw/rxe/rxe_param.h | 2 +-
drivers/infiniband/sw/rxe/rxe_qp.c | 2 +-
drivers/infiniband/sw/rxe/rxe_recv.c | 4 +-
drivers/infiniband/sw/rxe/rxe_req.c | 2 +-
drivers/infiniband/sw/rxe/rxe_resp.c | 187 +++++++-------
drivers/infiniband/sw/rxe/rxe_resp.h | 46 ++++
drivers/infiniband/sw/rxe/rxe_task.c | 52 +++-
drivers/infiniband/sw/rxe/rxe_task.h | 15 +-
drivers/infiniband/sw/rxe/rxe_verbs.c | 7 +-
drivers/infiniband/sw/rxe/rxe_verbs.h | 2 +
16 files changed, 638 insertions(+), 114 deletions(-)
create mode 100644 drivers/infiniband/sw/rxe/rxe_odp.c
create mode 100644 drivers/infiniband/sw/rxe/rxe_resp.h

--
2.31.1


Subject: [PATCH for-next v3 7/7] RDMA/rxe: Add support for the traditional Atomic operations with ODP

Enable 'fetch and add' and 'compare and swap' operations to manipulate
data in an ODP-enabled MR. This is comprised of the following steps:
1. Check the driver page table(umem_odp->dma_list) to see if the target
page is both readable and writable.
2. If not, then trigger page fault to map the page.
3. Convert its user space address to a kernel logical address using PFNs
in the driver page table(umem_odp->pfn_list).
4. Execute the operation.

umem_mutex is used to ensure that dma_list (an array of addresses of an MR)
is not changed while it is checked and that the target page is not
invalidated before data access completes.

Signed-off-by: Daisuke Matsuda <[email protected]>
---
drivers/infiniband/sw/rxe/rxe.c | 1 +
drivers/infiniband/sw/rxe/rxe_loc.h | 11 +++++++
drivers/infiniband/sw/rxe/rxe_odp.c | 46 ++++++++++++++++++++++++++++
drivers/infiniband/sw/rxe/rxe_resp.c | 2 +-
4 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/drivers/infiniband/sw/rxe/rxe.c b/drivers/infiniband/sw/rxe/rxe.c
index 2c9f0cf96671..30daf14ee0e8 100644
--- a/drivers/infiniband/sw/rxe/rxe.c
+++ b/drivers/infiniband/sw/rxe/rxe.c
@@ -88,6 +88,7 @@ static void rxe_init_device_param(struct rxe_dev *rxe)
rxe->attr.odp_caps.per_transport_caps.rc_odp_caps |= IB_ODP_SUPPORT_RECV;
rxe->attr.odp_caps.per_transport_caps.rc_odp_caps |= IB_ODP_SUPPORT_WRITE;
rxe->attr.odp_caps.per_transport_caps.rc_odp_caps |= IB_ODP_SUPPORT_READ;
+ rxe->attr.odp_caps.per_transport_caps.rc_odp_caps |= IB_ODP_SUPPORT_ATOMIC;
rxe->attr.odp_caps.per_transport_caps.rc_odp_caps |= IB_ODP_SUPPORT_SRQ_RECV;
}
}
diff --git a/drivers/infiniband/sw/rxe/rxe_loc.h b/drivers/infiniband/sw/rxe/rxe_loc.h
index fb468999e81e..24b0b7069688 100644
--- a/drivers/infiniband/sw/rxe/rxe_loc.h
+++ b/drivers/infiniband/sw/rxe/rxe_loc.h
@@ -7,6 +7,8 @@
#ifndef RXE_LOC_H
#define RXE_LOC_H

+#include "rxe_resp.h"
+
/* rxe_av.c */
void rxe_init_av(struct rdma_ah_attr *attr, struct rxe_av *av);
int rxe_av_chk_attr(struct rxe_qp *qp, struct rdma_ah_attr *attr);
@@ -192,6 +194,8 @@ int rxe_create_user_odp_mr(struct ib_pd *pd, u64 start, u64 length, u64 iova,
int access_flags, struct rxe_mr *mr);
int rxe_odp_mr_copy(struct rxe_mr *mr, u64 iova, void *addr, int length,
enum rxe_mr_copy_dir dir);
+enum resp_states rxe_odp_atomic_ops(struct rxe_qp *qp, struct rxe_pkt_info *pkt,
+ struct rxe_mr *mr);

#else /* CONFIG_INFINIBAND_ON_DEMAND_PAGING */
static inline int
@@ -204,6 +208,13 @@ static inline int
rxe_odp_mr_copy(struct rxe_mr *mr, u64 iova, void *addr,
int length, enum rxe_mr_copy_dir dir) { return 0; }

+static inline enum resp_states
+rxe_odp_atomic_ops(struct rxe_qp *qp, struct rxe_pkt_info *pkt,
+ struct rxe_mr *mr)
+{
+ return RESPST_ERR_UNSUPPORTED_OPCODE;
+}
+
#endif /* CONFIG_INFINIBAND_ON_DEMAND_PAGING */

#endif /* RXE_LOC_H */
diff --git a/drivers/infiniband/sw/rxe/rxe_odp.c b/drivers/infiniband/sw/rxe/rxe_odp.c
index c55512417d11..6e0b6a872ddc 100644
--- a/drivers/infiniband/sw/rxe/rxe_odp.c
+++ b/drivers/infiniband/sw/rxe/rxe_odp.c
@@ -291,3 +291,49 @@ int rxe_odp_mr_copy(struct rxe_mr *mr, u64 iova, void *addr, int length,

return err;
}
+
+static inline void *rxe_odp_get_virt_atomic(struct rxe_qp *qp, struct rxe_mr *mr)
+{
+ struct ib_umem_odp *umem_odp = to_ib_umem_odp(mr->umem);
+ u64 iova = qp->resp.va + qp->resp.offset;
+ int idx;
+ size_t offset;
+
+ if (rxe_odp_map_range(mr, iova, sizeof(char), 0))
+ return NULL;
+
+ idx = (iova - ib_umem_start(umem_odp)) >> umem_odp->page_shift;
+ offset = iova & (BIT(umem_odp->page_shift) - 1);
+
+ return rxe_odp_get_virt(umem_odp, idx, offset);
+}
+
+enum resp_states rxe_odp_atomic_ops(struct rxe_qp *qp, struct rxe_pkt_info *pkt,
+ struct rxe_mr *mr)
+{
+ struct ib_umem_odp *umem_odp = to_ib_umem_odp(mr->umem);
+ u64 *vaddr;
+ int ret;
+
+ if (unlikely(!mr->odp_enabled))
+ return RESPST_ERR_RKEY_VIOLATION;
+
+ /* If pagefault is not required, umem mutex will be held until the
+ * atomic operation completes. Otherwise, it is released and locked
+ * again in rxe_odp_map_range() to let invalidation handler do its
+ * work meanwhile.
+ */
+ mutex_lock(&umem_odp->umem_mutex);
+
+ vaddr = (u64 *)rxe_odp_get_virt_atomic(qp, mr);
+ if (!vaddr)
+ return RESPST_ERR_RKEY_VIOLATION;
+
+ if (pkt->mask & RXE_ATOMIC_MASK)
+ ret = rxe_process_atomic(qp, pkt, vaddr);
+ else
+ ret = RESPST_ERR_UNSUPPORTED_OPCODE;
+
+ mutex_unlock(&umem_odp->umem_mutex);
+ return ret;
+}
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index 7ef492e50e20..669d3e1a6ee4 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -784,7 +784,7 @@ static enum resp_states rxe_atomic_reply(struct rxe_qp *qp,
return RESPST_ERR_RKEY_VIOLATION;

if (mr->odp_enabled)
- ret = RESPST_ERR_UNSUPPORTED_OPCODE;
+ ret = rxe_odp_atomic_ops(qp, pkt, mr);
else
ret = rxe_atomic_ops(qp, pkt, mr);
} else
--
2.31.1

Subject: [PATCH for-next v3 4/7] RDMA/rxe: Add page invalidation support

On page invalidation, an MMU notifier callback is invoked to unmap DMA
addresses and update the driver page table(umem_odp->dma_list). The
callback is registered when an ODP-enabled MR is created.

Signed-off-by: Daisuke Matsuda <[email protected]>
---
drivers/infiniband/sw/rxe/Makefile | 2 ++
drivers/infiniband/sw/rxe/rxe_odp.c | 34 +++++++++++++++++++++++++++++
2 files changed, 36 insertions(+)
create mode 100644 drivers/infiniband/sw/rxe/rxe_odp.c

diff --git a/drivers/infiniband/sw/rxe/Makefile b/drivers/infiniband/sw/rxe/Makefile
index 5395a581f4bb..93134f1d1d0c 100644
--- a/drivers/infiniband/sw/rxe/Makefile
+++ b/drivers/infiniband/sw/rxe/Makefile
@@ -23,3 +23,5 @@ rdma_rxe-y := \
rxe_task.o \
rxe_net.o \
rxe_hw_counters.o
+
+rdma_rxe-$(CONFIG_INFINIBAND_ON_DEMAND_PAGING) += rxe_odp.o
diff --git a/drivers/infiniband/sw/rxe/rxe_odp.c b/drivers/infiniband/sw/rxe/rxe_odp.c
new file mode 100644
index 000000000000..0787a9b19646
--- /dev/null
+++ b/drivers/infiniband/sw/rxe/rxe_odp.c
@@ -0,0 +1,34 @@
+// SPDX-License-Identifier: GPL-2.0 OR Linux-OpenIB
+/*
+ * Copyright (c) 2022 Fujitsu Ltd. All rights reserved.
+ */
+
+#include <rdma/ib_umem_odp.h>
+
+static bool rxe_ib_invalidate_range(struct mmu_interval_notifier *mni,
+ const struct mmu_notifier_range *range,
+ unsigned long cur_seq)
+{
+ struct ib_umem_odp *umem_odp =
+ container_of(mni, struct ib_umem_odp, notifier);
+ unsigned long start;
+ unsigned long end;
+
+ if (!mmu_notifier_range_blockable(range))
+ return false;
+
+ mutex_lock(&umem_odp->umem_mutex);
+ mmu_interval_set_seq(mni, cur_seq);
+
+ start = max_t(u64, ib_umem_start(umem_odp), range->start);
+ end = min_t(u64, ib_umem_end(umem_odp), range->end);
+
+ ib_umem_odp_unmap_dma_pages(umem_odp, start, end);
+
+ mutex_unlock(&umem_odp->umem_mutex);
+ return true;
+}
+
+const struct mmu_interval_notifier_ops rxe_mn_ops = {
+ .invalidate = rxe_ib_invalidate_range,
+};
--
2.31.1

Subject: [PATCH for-next v3 5/7] RDMA/rxe: Allow registering MRs for On-Demand Paging

Allow applications to register an ODP-enabled MR, in which case the flag
IB_ACCESS_ON_DEMAND is passed to rxe_reg_user_mr(). However, there is no
RDMA operation supported right now. They will be enabled later in the
subsequent two patches.

rxe_odp_do_pagefault() is called to initialize an ODP-enabled MR. It syncs
process address space from the CPU page table to the driver page table
(dma_list/pfn_list in umem_odp) when called with RXE_PAGEFAULT_SNAPSHOT
flag. Additionally, It can be used to trigger page fault when pages being
accessed are not present or do not have proper read/write permissions, and
possibly to prefetch pages in the future.

Signed-off-by: Daisuke Matsuda <[email protected]>
---
drivers/infiniband/sw/rxe/rxe.c | 7 +++
drivers/infiniband/sw/rxe/rxe_loc.h | 16 ++++++
drivers/infiniband/sw/rxe/rxe_mr.c | 7 ++-
drivers/infiniband/sw/rxe/rxe_odp.c | 83 +++++++++++++++++++++++++++
drivers/infiniband/sw/rxe/rxe_resp.c | 33 +++++++++--
drivers/infiniband/sw/rxe/rxe_verbs.c | 7 ++-
drivers/infiniband/sw/rxe/rxe_verbs.h | 2 +
7 files changed, 147 insertions(+), 8 deletions(-)

diff --git a/drivers/infiniband/sw/rxe/rxe.c b/drivers/infiniband/sw/rxe/rxe.c
index 3c7e42e5b0c7..7a8a09487f96 100644
--- a/drivers/infiniband/sw/rxe/rxe.c
+++ b/drivers/infiniband/sw/rxe/rxe.c
@@ -73,6 +73,13 @@ static void rxe_init_device_param(struct rxe_dev *rxe)
rxe->ndev->dev_addr);

rxe->max_ucontext = RXE_MAX_UCONTEXT;
+
+ if (IS_ENABLED(CONFIG_INFINIBAND_ON_DEMAND_PAGING)) {
+ rxe->attr.kernel_cap_flags |= IBK_ON_DEMAND_PAGING;
+
+ /* IB_ODP_SUPPORT_IMPLICIT is not supported right now. */
+ rxe->attr.odp_caps.general_caps |= IB_ODP_SUPPORT;
+ }
}

/* initialize port attributes */
diff --git a/drivers/infiniband/sw/rxe/rxe_loc.h b/drivers/infiniband/sw/rxe/rxe_loc.h
index d567aa65b5e0..ab66a07e5700 100644
--- a/drivers/infiniband/sw/rxe/rxe_loc.h
+++ b/drivers/infiniband/sw/rxe/rxe_loc.h
@@ -60,6 +60,7 @@ int rxe_mmap(struct ib_ucontext *context, struct vm_area_struct *vma);

/* rxe_mr.c */
u8 rxe_get_next_key(u32 last_key);
+void rxe_mr_init(int access, struct rxe_mr *mr);
void rxe_mr_init_dma(int access, struct rxe_mr *mr);
int rxe_mr_init_user(struct rxe_dev *rxe, u64 start, u64 length, u64 iova,
int access, struct rxe_mr *mr);
@@ -185,4 +186,19 @@ static inline unsigned int wr_opcode_mask(int opcode, struct rxe_qp *qp)
return rxe_wr_opcode_info[opcode].mask[qp->ibqp.qp_type];
}

+#ifdef CONFIG_INFINIBAND_ON_DEMAND_PAGING
+/* rxe_odp.c */
+int rxe_create_user_odp_mr(struct ib_pd *pd, u64 start, u64 length, u64 iova,
+ int access_flags, struct rxe_mr *mr);
+
+#else /* CONFIG_INFINIBAND_ON_DEMAND_PAGING */
+static inline int
+rxe_create_user_odp_mr(struct ib_pd *pd, u64 start, u64 length, u64 iova,
+ int access_flags, struct rxe_mr *mr)
+{
+ return -EOPNOTSUPP;
+}
+
+#endif /* CONFIG_INFINIBAND_ON_DEMAND_PAGING */
+
#endif /* RXE_LOC_H */
diff --git a/drivers/infiniband/sw/rxe/rxe_mr.c b/drivers/infiniband/sw/rxe/rxe_mr.c
index 072eac4b65d2..261bb462341b 100644
--- a/drivers/infiniband/sw/rxe/rxe_mr.c
+++ b/drivers/infiniband/sw/rxe/rxe_mr.c
@@ -49,7 +49,7 @@ int mr_check_range(struct rxe_mr *mr, u64 iova, size_t length)
| IB_ACCESS_REMOTE_WRITE \
| IB_ACCESS_REMOTE_ATOMIC)

-static void rxe_mr_init(int access, struct rxe_mr *mr)
+void rxe_mr_init(int access, struct rxe_mr *mr)
{
u32 lkey = mr->elem.index << 8 | rxe_get_next_key(-1);
u32 rkey = (access & IB_ACCESS_REMOTE) ? lkey : 0;
@@ -478,7 +478,10 @@ int copy_data(
if (bytes > 0) {
iova = sge->addr + offset;

- err = rxe_mr_copy(mr, iova, addr, bytes, dir);
+ if (mr->odp_enabled)
+ err = -EOPNOTSUPP;
+ else
+ err = rxe_mr_copy(mr, iova, addr, bytes, dir);
if (err)
goto err2;

diff --git a/drivers/infiniband/sw/rxe/rxe_odp.c b/drivers/infiniband/sw/rxe/rxe_odp.c
index 0787a9b19646..8b01ceaeee36 100644
--- a/drivers/infiniband/sw/rxe/rxe_odp.c
+++ b/drivers/infiniband/sw/rxe/rxe_odp.c
@@ -5,6 +5,8 @@

#include <rdma/ib_umem_odp.h>

+#include "rxe.h"
+
static bool rxe_ib_invalidate_range(struct mmu_interval_notifier *mni,
const struct mmu_notifier_range *range,
unsigned long cur_seq)
@@ -32,3 +34,84 @@ static bool rxe_ib_invalidate_range(struct mmu_interval_notifier *mni,
const struct mmu_interval_notifier_ops rxe_mn_ops = {
.invalidate = rxe_ib_invalidate_range,
};
+
+#define RXE_PAGEFAULT_RDONLY BIT(1)
+#define RXE_PAGEFAULT_SNAPSHOT BIT(2)
+static int rxe_odp_do_pagefault(struct rxe_mr *mr, u64 user_va, int bcnt, u32 flags)
+{
+ int np;
+ u64 access_mask;
+ bool fault = !(flags & RXE_PAGEFAULT_SNAPSHOT);
+ struct ib_umem_odp *umem_odp = to_ib_umem_odp(mr->umem);
+
+ access_mask = ODP_READ_ALLOWED_BIT;
+ if (umem_odp->umem.writable && !(flags & RXE_PAGEFAULT_RDONLY))
+ access_mask |= ODP_WRITE_ALLOWED_BIT;
+
+ /*
+ * ib_umem_odp_map_dma_and_lock() locks umem_mutex on success.
+ * Callers must release the lock later to let invalidation handler
+ * do its work again.
+ */
+ np = ib_umem_odp_map_dma_and_lock(umem_odp, user_va, bcnt,
+ access_mask, fault);
+
+ return np;
+}
+
+static int rxe_init_odp_mr(struct rxe_mr *mr)
+{
+ int ret;
+ struct ib_umem_odp *umem_odp = to_ib_umem_odp(mr->umem);
+
+ ret = rxe_odp_do_pagefault(mr, mr->umem->address, mr->umem->length,
+ RXE_PAGEFAULT_SNAPSHOT);
+
+ if (ret >= 0)
+ mutex_unlock(&umem_odp->umem_mutex);
+
+ return ret >= 0 ? 0 : ret;
+}
+
+int rxe_create_user_odp_mr(struct ib_pd *pd, u64 start, u64 length, u64 iova,
+ int access_flags, struct rxe_mr *mr)
+{
+ int err;
+ struct ib_umem_odp *umem_odp;
+ struct rxe_dev *dev = container_of(pd->device, struct rxe_dev, ib_dev);
+
+ if (!IS_ENABLED(CONFIG_INFINIBAND_ON_DEMAND_PAGING))
+ return -EOPNOTSUPP;
+
+ rxe_mr_init(access_flags, mr);
+
+ if (!start && length == U64_MAX) {
+ if (iova != 0)
+ return -EINVAL;
+ if (!(dev->attr.odp_caps.general_caps & IB_ODP_SUPPORT_IMPLICIT))
+ return -EINVAL;
+
+ /* Never reach here, for implicit ODP is not implemented. */
+ }
+
+ umem_odp = ib_umem_odp_get(pd->device, start, length, access_flags,
+ &rxe_mn_ops);
+ if (IS_ERR(umem_odp))
+ return PTR_ERR(umem_odp);
+
+ umem_odp->private = mr;
+
+ mr->odp_enabled = true;
+ mr->ibmr.pd = pd;
+ mr->umem = &umem_odp->umem;
+ mr->access = access_flags;
+ mr->ibmr.length = length;
+ mr->ibmr.iova = iova;
+ mr->offset = ib_umem_offset(&umem_odp->umem);
+ mr->state = RXE_MR_STATE_VALID;
+ mr->ibmr.type = IB_MR_TYPE_USER;
+
+ err = rxe_init_odp_mr(mr);
+
+ return err;
+}
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index e18bca076337..f2c803036888 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -627,8 +627,16 @@ static enum resp_states write_data_in(struct rxe_qp *qp,
int err;
int data_len = payload_size(pkt);

- err = rxe_mr_copy(qp->resp.mr, qp->resp.va + qp->resp.offset,
- payload_addr(pkt), data_len, RXE_TO_MR_OBJ);
+ /* resp.mr is not set in check_rkey() for zero byte operations */
+ if (data_len == 0)
+ goto out;
+
+ if (qp->resp.mr->odp_enabled)
+ err = RESPST_ERR_UNSUPPORTED_OPCODE;
+ else
+ err = rxe_mr_copy(qp->resp.mr, qp->resp.va + qp->resp.offset,
+ payload_addr(pkt), data_len, RXE_TO_MR_OBJ);
+
if (err) {
rc = RESPST_ERR_RKEY_VIOLATION;
goto out;
@@ -693,6 +701,9 @@ static enum resp_states process_flush(struct rxe_qp *qp,
struct rxe_mr *mr = qp->resp.mr;
struct resp_res *res = qp->resp.res;

+ if (mr->odp_enabled)
+ return RESPST_ERR_UNSUPPORTED_OPCODE;
+
/* oA19-14, oA19-15 */
if (res && res->replay)
return RESPST_ACKNOWLEDGE;
@@ -807,7 +818,11 @@ static enum resp_states rxe_atomic_reply(struct rxe_qp *qp,
if (!res->replay) {
if (mr->state != RXE_MR_STATE_VALID)
return RESPST_ERR_RKEY_VIOLATION;
- ret = rxe_atomic_ops(qp, pkt, mr);
+
+ if (mr->odp_enabled)
+ ret = RESPST_ERR_UNSUPPORTED_OPCODE;
+ else
+ ret = rxe_atomic_ops(qp, pkt, mr);
} else
ret = RESPST_ACKNOWLEDGE;

@@ -822,6 +837,9 @@ static enum resp_states do_atomic_write(struct rxe_qp *qp,
int payload = payload_size(pkt);
u64 src, *dst;

+ if (mr->odp_enabled)
+ return RESPST_ERR_UNSUPPORTED_OPCODE;
+
if (mr->state != RXE_MR_STATE_VALID)
return RESPST_ERR_RKEY_VIOLATION;

@@ -1031,8 +1049,13 @@ static enum resp_states read_reply(struct rxe_qp *qp,
return RESPST_ERR_RNR;
}

- err = rxe_mr_copy(mr, res->read.va, payload_addr(&ack_pkt),
- payload, RXE_FROM_MR_OBJ);
+ /* mr is NULL for a zero byte operation. */
+ if ((res->read.resid != 0) && mr->odp_enabled)
+ err = RESPST_ERR_UNSUPPORTED_OPCODE;
+ else
+ err = rxe_mr_copy(mr, res->read.va, payload_addr(&ack_pkt),
+ payload, RXE_FROM_MR_OBJ);
+
if (mr)
rxe_put(mr);
if (err) {
diff --git a/drivers/infiniband/sw/rxe/rxe_verbs.c b/drivers/infiniband/sw/rxe/rxe_verbs.c
index 025b35bf014e..54cbb7a86019 100644
--- a/drivers/infiniband/sw/rxe/rxe_verbs.c
+++ b/drivers/infiniband/sw/rxe/rxe_verbs.c
@@ -903,7 +903,12 @@ static struct ib_mr *rxe_reg_user_mr(struct ib_pd *ibpd,
mr->ibmr.pd = ibpd;
mr->ibmr.device = ibpd->device;

- err = rxe_mr_init_user(rxe, start, length, iova, access, mr);
+ if (access & IB_ACCESS_ON_DEMAND)
+ err = rxe_create_user_odp_mr(&pd->ibpd, start, length, iova,
+ access, mr);
+ else
+ err = rxe_mr_init_user(rxe, start, length, iova, access, mr);
+
if (err)
goto err1;

diff --git a/drivers/infiniband/sw/rxe/rxe_verbs.h b/drivers/infiniband/sw/rxe/rxe_verbs.h
index 19ddfa890480..f90116576e7b 100644
--- a/drivers/infiniband/sw/rxe/rxe_verbs.h
+++ b/drivers/infiniband/sw/rxe/rxe_verbs.h
@@ -327,6 +327,8 @@ struct rxe_mr {
atomic_t num_mw;

struct rxe_map **map;
+
+ bool odp_enabled;
};

enum rxe_mw_state {
--
2.31.1

Subject: [PATCH for-next v3 3/7] RDMA/rxe: Cleanup code for responder Atomic operations

Currently, rxe_responder() directly calls the function to execute Atomic
operations. This need to be modified to insert some conditional branches
for the ODP feature. Additionally, rxe_resp.h is newly added to be used by
rxe_odp.c in near future.

Signed-off-by: Daisuke Matsuda <[email protected]>
---
drivers/infiniband/sw/rxe/rxe_resp.c | 100 +++++++++++++++++----------
drivers/infiniband/sw/rxe/rxe_resp.h | 9 +++
2 files changed, 71 insertions(+), 38 deletions(-)
create mode 100644 drivers/infiniband/sw/rxe/rxe_resp.h

diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index 991550baef8c..e18bca076337 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -9,6 +9,7 @@
#include "rxe.h"
#include "rxe_loc.h"
#include "rxe_queue.h"
+#include "rxe_resp.h"

enum resp_states {
RESPST_NONE,
@@ -733,60 +734,83 @@ static enum resp_states process_flush(struct rxe_qp *qp,
/* Guarantee atomicity of atomic operations at the machine level. */
static DEFINE_SPINLOCK(atomic_ops_lock);

-static enum resp_states atomic_reply(struct rxe_qp *qp,
- struct rxe_pkt_info *pkt)
+enum resp_states rxe_process_atomic(struct rxe_qp *qp,
+ struct rxe_pkt_info *pkt, u64 *vaddr)
{
- u64 *vaddr;
enum resp_states ret;
- struct rxe_mr *mr = qp->resp.mr;
struct resp_res *res = qp->resp.res;
u64 value;

- if (!res) {
- res = rxe_prepare_res(qp, pkt, RXE_ATOMIC_MASK);
- qp->resp.res = res;
+ /* check vaddr is 8 bytes aligned. */
+ if (!vaddr || (uintptr_t)vaddr & 7) {
+ ret = RESPST_ERR_MISALIGNED_ATOMIC;
+ goto out;
}

- if (!res->replay) {
- if (mr->state != RXE_MR_STATE_VALID) {
- ret = RESPST_ERR_RKEY_VIOLATION;
- goto out;
- }
+ spin_lock(&atomic_ops_lock);
+ res->atomic.orig_val = value = *vaddr;

- vaddr = iova_to_vaddr(mr, qp->resp.va + qp->resp.offset,
- sizeof(u64));
+ if (pkt->opcode == IB_OPCODE_RC_COMPARE_SWAP) {
+ if (value == atmeth_comp(pkt))
+ value = atmeth_swap_add(pkt);
+ } else {
+ value += atmeth_swap_add(pkt);
+ }

- /* check vaddr is 8 bytes aligned. */
- if (!vaddr || (uintptr_t)vaddr & 7) {
- ret = RESPST_ERR_MISALIGNED_ATOMIC;
- goto out;
- }
+ *vaddr = value;
+ spin_unlock(&atomic_ops_lock);

- spin_lock_bh(&atomic_ops_lock);
- res->atomic.orig_val = value = *vaddr;
+ qp->resp.msn++;

- if (pkt->opcode == IB_OPCODE_RC_COMPARE_SWAP) {
- if (value == atmeth_comp(pkt))
- value = atmeth_swap_add(pkt);
- } else {
- value += atmeth_swap_add(pkt);
- }
+ /* next expected psn, read handles this separately */
+ qp->resp.psn = (pkt->psn + 1) & BTH_PSN_MASK;
+ qp->resp.ack_psn = qp->resp.psn;

- *vaddr = value;
- spin_unlock_bh(&atomic_ops_lock);
+ qp->resp.opcode = pkt->opcode;
+ qp->resp.status = IB_WC_SUCCESS;

- qp->resp.msn++;
+ ret = RESPST_ACKNOWLEDGE;
+out:
+ return ret;
+}
+
+static enum resp_states rxe_atomic_ops(struct rxe_qp *qp,
+ struct rxe_pkt_info *pkt,
+ struct rxe_mr *mr)
+{
+ u64 *vaddr;
+ int ret;
+
+ vaddr = iova_to_vaddr(mr, qp->resp.va + qp->resp.offset,
+ sizeof(u64));
+
+ if (pkt->mask & RXE_ATOMIC_MASK)
+ ret = rxe_process_atomic(qp, pkt, vaddr);
+ else
+ ret = RESPST_ERR_UNSUPPORTED_OPCODE;
+
+ return ret;
+}

- /* next expected psn, read handles this separately */
- qp->resp.psn = (pkt->psn + 1) & BTH_PSN_MASK;
- qp->resp.ack_psn = qp->resp.psn;
+static enum resp_states rxe_atomic_reply(struct rxe_qp *qp,
+ struct rxe_pkt_info *pkt)
+{
+ struct rxe_mr *mr = qp->resp.mr;
+ struct resp_res *res = qp->resp.res;
+ int ret;

- qp->resp.opcode = pkt->opcode;
- qp->resp.status = IB_WC_SUCCESS;
+ if (!res) {
+ res = rxe_prepare_res(qp, pkt, RXE_ATOMIC_MASK);
+ qp->resp.res = res;
}

- ret = RESPST_ACKNOWLEDGE;
-out:
+ if (!res->replay) {
+ if (mr->state != RXE_MR_STATE_VALID)
+ return RESPST_ERR_RKEY_VIOLATION;
+ ret = rxe_atomic_ops(qp, pkt, mr);
+ } else
+ ret = RESPST_ACKNOWLEDGE;
+
return ret;
}

@@ -1556,7 +1580,7 @@ int rxe_responder(void *arg)
state = read_reply(qp, pkt);
break;
case RESPST_ATOMIC_REPLY:
- state = atomic_reply(qp, pkt);
+ state = rxe_atomic_reply(qp, pkt);
break;
case RESPST_ATOMIC_WRITE_REPLY:
state = atomic_write_reply(qp, pkt);
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.h b/drivers/infiniband/sw/rxe/rxe_resp.h
new file mode 100644
index 000000000000..94a4869fdab6
--- /dev/null
+++ b/drivers/infiniband/sw/rxe/rxe_resp.h
@@ -0,0 +1,9 @@
+/* SPDX-License-Identifier: GPL-2.0 OR Linux-OpenIB */
+
+#ifndef RXE_RESP_H
+#define RXE_RESP_H
+
+enum resp_states rxe_process_atomic(struct rxe_qp *qp,
+ struct rxe_pkt_info *pkt, u64 *vaddr);
+
+#endif /* RXE_RESP_H */
--
2.31.1

Subject: [PATCH for-next v3 1/7] RDMA/rxe: Convert triple tasklets to use workqueue

In order to implement On-Demand Paging on the rxe driver, triple tasklets
(requester, responder, and completer) must be allowed to sleep so that they
can trigger page fault when pages being accessed are not present.

This patch replaces the tasklets with a workqueue, but still allows direct-
call of works from softirq context if it is obvious that MRs are not going
to be accessed and there is no work being processed in the workqueue.

As counterparts to tasklet_disable() and tasklet_enable() are missing for
workqueues, an atomic value is introduced to prevent work items from being
scheduled while qp reset is in progress.

The way to initialize/destroy workqueue is picked up from the
implementation of Ian Ziemba and Bob Pearson at HPE.

Link: https://lore.kernel.org/all/[email protected]/
Signed-off-by: Daisuke Matsuda <[email protected]>
---
drivers/infiniband/sw/rxe/rxe.c | 9 ++++-
drivers/infiniband/sw/rxe/rxe_comp.c | 2 +-
drivers/infiniband/sw/rxe/rxe_param.h | 2 +-
drivers/infiniband/sw/rxe/rxe_qp.c | 2 +-
drivers/infiniband/sw/rxe/rxe_req.c | 2 +-
drivers/infiniband/sw/rxe/rxe_resp.c | 2 +-
drivers/infiniband/sw/rxe/rxe_task.c | 52 ++++++++++++++++++++-------
drivers/infiniband/sw/rxe/rxe_task.h | 15 ++++++--
8 files changed, 65 insertions(+), 21 deletions(-)

diff --git a/drivers/infiniband/sw/rxe/rxe.c b/drivers/infiniband/sw/rxe/rxe.c
index 136c2efe3466..3c7e42e5b0c7 100644
--- a/drivers/infiniband/sw/rxe/rxe.c
+++ b/drivers/infiniband/sw/rxe/rxe.c
@@ -210,10 +210,16 @@ static int __init rxe_module_init(void)
{
int err;

- err = rxe_net_init();
+ err = rxe_alloc_wq();
if (err)
return err;

+ err = rxe_net_init();
+ if (err) {
+ rxe_destroy_wq();
+ return err;
+ }
+
rdma_link_register(&rxe_link_ops);
pr_info("loaded\n");
return 0;
@@ -224,6 +230,7 @@ static void __exit rxe_module_exit(void)
rdma_link_unregister(&rxe_link_ops);
ib_unregister_driver(RDMA_DRIVER_RXE);
rxe_net_exit();
+ rxe_destroy_wq();

pr_info("unloaded\n");
}
diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
index 20737fec392b..046bbacce37c 100644
--- a/drivers/infiniband/sw/rxe/rxe_comp.c
+++ b/drivers/infiniband/sw/rxe/rxe_comp.c
@@ -773,7 +773,7 @@ int rxe_completer(void *arg)
}

/* A non-zero return value will cause rxe_do_task to
- * exit its loop and end the tasklet. A zero return
+ * exit its loop and end the work item. A zero return
* will continue looping and return to rxe_completer
*/
done:
diff --git a/drivers/infiniband/sw/rxe/rxe_param.h b/drivers/infiniband/sw/rxe/rxe_param.h
index a754fc902e3d..bd8050e99d6b 100644
--- a/drivers/infiniband/sw/rxe/rxe_param.h
+++ b/drivers/infiniband/sw/rxe/rxe_param.h
@@ -112,7 +112,7 @@ enum rxe_device_param {
RXE_INFLIGHT_SKBS_PER_QP_HIGH = 64,
RXE_INFLIGHT_SKBS_PER_QP_LOW = 16,

- /* Max number of interations of each tasklet
+ /* Max number of interations of each work item
* before yielding the cpu to let other
* work make progress
*/
diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
index ab72db68b58f..e033b2449dfe 100644
--- a/drivers/infiniband/sw/rxe/rxe_qp.c
+++ b/drivers/infiniband/sw/rxe/rxe_qp.c
@@ -471,7 +471,7 @@ int rxe_qp_chk_attr(struct rxe_dev *rxe, struct rxe_qp *qp,
/* move the qp to the reset state */
static void rxe_qp_reset(struct rxe_qp *qp)
{
- /* stop tasks from running */
+ /* flush workqueue and stop tasks from running */
rxe_disable_task(&qp->resp.task);

/* stop request/comp */
diff --git a/drivers/infiniband/sw/rxe/rxe_req.c b/drivers/infiniband/sw/rxe/rxe_req.c
index 899c8779f800..2bcd287a2c3b 100644
--- a/drivers/infiniband/sw/rxe/rxe_req.c
+++ b/drivers/infiniband/sw/rxe/rxe_req.c
@@ -830,7 +830,7 @@ int rxe_requester(void *arg)
update_state(qp, &pkt);

/* A non-zero return value will cause rxe_do_task to
- * exit its loop and end the tasklet. A zero return
+ * exit its loop and end the work item. A zero return
* will continue looping and return to rxe_requester
*/
done:
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index c74972244f08..d9134a00a529 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -1691,7 +1691,7 @@ int rxe_responder(void *arg)
}

/* A non-zero return value will cause rxe_do_task to
- * exit its loop and end the tasklet. A zero return
+ * exit its loop and end the work item. A zero return
* will continue looping and return to rxe_responder
*/
done:
diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
index 60b90e33a884..b96f72aa9005 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.c
+++ b/drivers/infiniband/sw/rxe/rxe_task.c
@@ -6,6 +6,22 @@

#include "rxe.h"

+static struct workqueue_struct *rxe_wq;
+
+int rxe_alloc_wq(void)
+{
+ rxe_wq = alloc_workqueue("rxe_wq", WQ_CPU_INTENSIVE, WQ_MAX_ACTIVE);
+ if (!rxe_wq)
+ return -ENOMEM;
+
+ return 0;
+}
+
+void rxe_destroy_wq(void)
+{
+ destroy_workqueue(rxe_wq);
+}
+
int __rxe_do_task(struct rxe_task *task)

{
@@ -24,11 +40,11 @@ int __rxe_do_task(struct rxe_task *task)
* a second caller finds the task already running
* but looks just after the last call to func
*/
-static void do_task(struct tasklet_struct *t)
+static void do_task(struct work_struct *w)
{
int cont;
int ret;
- struct rxe_task *task = from_tasklet(task, t, tasklet);
+ struct rxe_task *task = container_of(w, typeof(*task), work);
struct rxe_qp *qp = (struct rxe_qp *)task->arg;
unsigned int iterations = RXE_MAX_ITERATIONS;

@@ -64,10 +80,10 @@ static void do_task(struct tasklet_struct *t)
} else if (iterations--) {
cont = 1;
} else {
- /* reschedule the tasklet and exit
+ /* reschedule the work item and exit
* the loop to give up the cpu
*/
- tasklet_schedule(&task->tasklet);
+ queue_work(task->workq, &task->work);
task->state = TASK_STATE_START;
}
break;
@@ -97,7 +113,8 @@ int rxe_init_task(struct rxe_task *task, void *arg, int (*func)(void *))
task->func = func;
task->destroyed = false;

- tasklet_setup(&task->tasklet, do_task);
+ INIT_WORK(&task->work, do_task);
+ task->workq = rxe_wq;

task->state = TASK_STATE_START;
spin_lock_init(&task->lock);
@@ -111,17 +128,16 @@ void rxe_cleanup_task(struct rxe_task *task)

/*
* Mark the task, then wait for it to finish. It might be
- * running in a non-tasklet (direct call) context.
+ * running in a non-workqueue (direct call) context.
*/
task->destroyed = true;
+ flush_workqueue(task->workq);

do {
spin_lock_bh(&task->lock);
idle = (task->state == TASK_STATE_START);
spin_unlock_bh(&task->lock);
} while (!idle);
-
- tasklet_kill(&task->tasklet);
}

void rxe_run_task(struct rxe_task *task)
@@ -129,7 +145,7 @@ void rxe_run_task(struct rxe_task *task)
if (task->destroyed)
return;

- do_task(&task->tasklet);
+ do_task(&task->work);
}

void rxe_sched_task(struct rxe_task *task)
@@ -137,15 +153,27 @@ void rxe_sched_task(struct rxe_task *task)
if (task->destroyed)
return;

- tasklet_schedule(&task->tasklet);
+ /*
+ * busy-loop while qp reset is in progress.
+ * This may be called from softirq context and thus cannot sleep.
+ */
+ while (atomic_read(&task->suspended))
+ cpu_relax();
+
+ queue_work(task->workq, &task->work);
}

void rxe_disable_task(struct rxe_task *task)
{
- tasklet_disable(&task->tasklet);
+ /* Alternative to tasklet_disable() */
+ atomic_inc(&task->suspended);
+ smp_mb__after_atomic();
+ flush_workqueue(task->workq);
}

void rxe_enable_task(struct rxe_task *task)
{
- tasklet_enable(&task->tasklet);
+ /* Alternative to tasklet_enable() */
+ smp_mb__before_atomic();
+ atomic_dec(&task->suspended);
}
diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
index 7b88129702ac..9aa3f236e886 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.h
+++ b/drivers/infiniband/sw/rxe/rxe_task.h
@@ -19,15 +19,22 @@ enum {
* called again.
*/
struct rxe_task {
- struct tasklet_struct tasklet;
+ struct workqueue_struct *workq;
+ struct work_struct work;
int state;
spinlock_t lock;
void *arg;
int (*func)(void *arg);
int ret;
bool destroyed;
+ /* used to {dis, en}able per-qp work items */
+ atomic_t suspended;
};

+int rxe_alloc_wq(void);
+
+void rxe_destroy_wq(void);
+
/*
* init rxe_task structure
* arg => parameter to pass to fcn
@@ -40,18 +47,20 @@ void rxe_cleanup_task(struct rxe_task *task);

/*
* raw call to func in loop without any checking
- * can call when tasklets are disabled
+ * can call when tasks are suspended
*/
int __rxe_do_task(struct rxe_task *task);

+/* run a task without scheduling */
void rxe_run_task(struct rxe_task *task);

+/* schedule a task into workqueue */
void rxe_sched_task(struct rxe_task *task);

/* keep a task from scheduling */
void rxe_disable_task(struct rxe_task *task);

-/* allow task to run */
+/* allow a task to run again */
void rxe_enable_task(struct rxe_task *task);

#endif /* RXE_TASK_H */
--
2.31.1

Subject: [PATCH for-next v3 2/7] RDMA/rxe: Always schedule works before accessing user MRs

Both responder and completer can sleep to execute page-fault when used
with ODP, and page-fault handler can be invoked when they are going to
access user MRs, so works must be scheduled in such cases.

Signed-off-by: Daisuke Matsuda <[email protected]>
---
drivers/infiniband/sw/rxe/rxe_comp.c | 20 ++++++++++++++++++--
drivers/infiniband/sw/rxe/rxe_loc.h | 4 ++--
drivers/infiniband/sw/rxe/rxe_recv.c | 4 ++--
drivers/infiniband/sw/rxe/rxe_resp.c | 15 ++++++++++-----
4 files changed, 32 insertions(+), 11 deletions(-)

diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
index 046bbacce37c..421f4ffe51a3 100644
--- a/drivers/infiniband/sw/rxe/rxe_comp.c
+++ b/drivers/infiniband/sw/rxe/rxe_comp.c
@@ -124,13 +124,29 @@ void retransmit_timer(struct timer_list *t)
}
}

-void rxe_comp_queue_pkt(struct rxe_qp *qp, struct sk_buff *skb)
+void rxe_comp_queue_pkt(struct rxe_pkt_info *pkt, struct sk_buff *skb)
{
+ struct rxe_qp *qp = pkt->qp;
int must_sched;

skb_queue_tail(&qp->resp_pkts, skb);

- must_sched = skb_queue_len(&qp->resp_pkts) > 1;
+ /* Schedule a work item if processing READ or ATOMIC acks.
+ * In these cases, completer may sleep to access ODP-enabled MRs.
+ */
+ switch (pkt->opcode) {
+ case IB_OPCODE_RC_RDMA_READ_RESPONSE_ONLY:
+ case IB_OPCODE_RC_RDMA_READ_RESPONSE_FIRST:
+ case IB_OPCODE_RC_RDMA_READ_RESPONSE_MIDDLE:
+ case IB_OPCODE_RC_RDMA_READ_RESPONSE_LAST:
+ case IB_OPCODE_RC_ATOMIC_ACKNOWLEDGE:
+ must_sched = 1;
+ break;
+
+ default:
+ must_sched = skb_queue_len(&qp->resp_pkts) > 1;
+ }
+
if (must_sched != 0)
rxe_counter_inc(SKB_TO_PKT(skb)->rxe, RXE_CNT_COMPLETER_SCHED);

diff --git a/drivers/infiniband/sw/rxe/rxe_loc.h b/drivers/infiniband/sw/rxe/rxe_loc.h
index 948ce4902b10..d567aa65b5e0 100644
--- a/drivers/infiniband/sw/rxe/rxe_loc.h
+++ b/drivers/infiniband/sw/rxe/rxe_loc.h
@@ -176,9 +176,9 @@ int rxe_icrc_init(struct rxe_dev *rxe);
int rxe_icrc_check(struct sk_buff *skb, struct rxe_pkt_info *pkt);
void rxe_icrc_generate(struct sk_buff *skb, struct rxe_pkt_info *pkt);

-void rxe_resp_queue_pkt(struct rxe_qp *qp, struct sk_buff *skb);
+void rxe_resp_queue_pkt(struct rxe_pkt_info *pkt, struct sk_buff *skb);

-void rxe_comp_queue_pkt(struct rxe_qp *qp, struct sk_buff *skb);
+void rxe_comp_queue_pkt(struct rxe_pkt_info *pkt, struct sk_buff *skb);

static inline unsigned int wr_opcode_mask(int opcode, struct rxe_qp *qp)
{
diff --git a/drivers/infiniband/sw/rxe/rxe_recv.c b/drivers/infiniband/sw/rxe/rxe_recv.c
index 434a693cd4a5..01d07572a56f 100644
--- a/drivers/infiniband/sw/rxe/rxe_recv.c
+++ b/drivers/infiniband/sw/rxe/rxe_recv.c
@@ -174,9 +174,9 @@ static int hdr_check(struct rxe_pkt_info *pkt)
static inline void rxe_rcv_pkt(struct rxe_pkt_info *pkt, struct sk_buff *skb)
{
if (pkt->mask & RXE_REQ_MASK)
- rxe_resp_queue_pkt(pkt->qp, skb);
+ rxe_resp_queue_pkt(pkt, skb);
else
- rxe_comp_queue_pkt(pkt->qp, skb);
+ rxe_comp_queue_pkt(pkt, skb);
}

static void rxe_rcv_mcast_pkt(struct rxe_dev *rxe, struct sk_buff *skb)
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index d9134a00a529..991550baef8c 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -85,15 +85,20 @@ static char *resp_state_name[] = {
};

/* rxe_recv calls here to add a request packet to the input queue */
-void rxe_resp_queue_pkt(struct rxe_qp *qp, struct sk_buff *skb)
+void rxe_resp_queue_pkt(struct rxe_pkt_info *pkt, struct sk_buff *skb)
{
- int must_sched;
- struct rxe_pkt_info *pkt = SKB_TO_PKT(skb);
+ int must_sched = 1;
+ struct rxe_qp *qp = pkt->qp;

skb_queue_tail(&qp->req_pkts, skb);

- must_sched = (pkt->opcode == IB_OPCODE_RC_RDMA_READ_REQUEST) ||
- (skb_queue_len(&qp->req_pkts) > 1);
+ /* responder can sleep to access an ODP-enabled MR. Always schedule
+ * work items for non-zero-byte operations, RDMA READ, and ATOMIC
+ * operations.
+ */
+ if ((skb_queue_len(&qp->req_pkts) == 1) && (payload_size(pkt) == 0)
+ && !(pkt->mask & RXE_READ_OR_ATOMIC_MASK))
+ must_sched = 0;

if (must_sched)
rxe_sched_task(&qp->resp.task);
--
2.31.1

2022-12-28 17:29:34

by Bob Pearson

[permalink] [raw]
Subject: Re: [PATCH for-next v3 1/7] RDMA/rxe: Convert triple tasklets to use workqueue

On 12/23/22 00:51, Daisuke Matsuda wrote:
> In order to implement On-Demand Paging on the rxe driver, triple tasklets
> (requester, responder, and completer) must be allowed to sleep so that they
> can trigger page fault when pages being accessed are not present.
>
> This patch replaces the tasklets with a workqueue, but still allows direct-
> call of works from softirq context if it is obvious that MRs are not going
> to be accessed and there is no work being processed in the workqueue.

There are already at least two patch sets that do this waiting to get upstream.
Bob

>
> As counterparts to tasklet_disable() and tasklet_enable() are missing for
> workqueues, an atomic value is introduced to prevent work items from being
> scheduled while qp reset is in progress.
>
> The way to initialize/destroy workqueue is picked up from the
> implementation of Ian Ziemba and Bob Pearson at HPE.
>
> Link: https://lore.kernel.org/all/[email protected]/
> Signed-off-by: Daisuke Matsuda <[email protected]>
> ---
> drivers/infiniband/sw/rxe/rxe.c | 9 ++++-
> drivers/infiniband/sw/rxe/rxe_comp.c | 2 +-
> drivers/infiniband/sw/rxe/rxe_param.h | 2 +-
> drivers/infiniband/sw/rxe/rxe_qp.c | 2 +-
> drivers/infiniband/sw/rxe/rxe_req.c | 2 +-
> drivers/infiniband/sw/rxe/rxe_resp.c | 2 +-
> drivers/infiniband/sw/rxe/rxe_task.c | 52 ++++++++++++++++++++-------
> drivers/infiniband/sw/rxe/rxe_task.h | 15 ++++++--
> 8 files changed, 65 insertions(+), 21 deletions(-)
>
> diff --git a/drivers/infiniband/sw/rxe/rxe.c b/drivers/infiniband/sw/rxe/rxe.c
> index 136c2efe3466..3c7e42e5b0c7 100644
> --- a/drivers/infiniband/sw/rxe/rxe.c
> +++ b/drivers/infiniband/sw/rxe/rxe.c
> @@ -210,10 +210,16 @@ static int __init rxe_module_init(void)
> {
> int err;
>
> - err = rxe_net_init();
> + err = rxe_alloc_wq();
> if (err)
> return err;
>
> + err = rxe_net_init();
> + if (err) {
> + rxe_destroy_wq();
> + return err;
> + }
> +
> rdma_link_register(&rxe_link_ops);
> pr_info("loaded\n");
> return 0;
> @@ -224,6 +230,7 @@ static void __exit rxe_module_exit(void)
> rdma_link_unregister(&rxe_link_ops);
> ib_unregister_driver(RDMA_DRIVER_RXE);
> rxe_net_exit();
> + rxe_destroy_wq();
>
> pr_info("unloaded\n");
> }
> diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
> index 20737fec392b..046bbacce37c 100644
> --- a/drivers/infiniband/sw/rxe/rxe_comp.c
> +++ b/drivers/infiniband/sw/rxe/rxe_comp.c
> @@ -773,7 +773,7 @@ int rxe_completer(void *arg)
> }
>
> /* A non-zero return value will cause rxe_do_task to
> - * exit its loop and end the tasklet. A zero return
> + * exit its loop and end the work item. A zero return
> * will continue looping and return to rxe_completer
> */
> done:
> diff --git a/drivers/infiniband/sw/rxe/rxe_param.h b/drivers/infiniband/sw/rxe/rxe_param.h
> index a754fc902e3d..bd8050e99d6b 100644
> --- a/drivers/infiniband/sw/rxe/rxe_param.h
> +++ b/drivers/infiniband/sw/rxe/rxe_param.h
> @@ -112,7 +112,7 @@ enum rxe_device_param {
> RXE_INFLIGHT_SKBS_PER_QP_HIGH = 64,
> RXE_INFLIGHT_SKBS_PER_QP_LOW = 16,
>
> - /* Max number of interations of each tasklet
> + /* Max number of interations of each work item
> * before yielding the cpu to let other
> * work make progress
> */
> diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
> index ab72db68b58f..e033b2449dfe 100644
> --- a/drivers/infiniband/sw/rxe/rxe_qp.c
> +++ b/drivers/infiniband/sw/rxe/rxe_qp.c
> @@ -471,7 +471,7 @@ int rxe_qp_chk_attr(struct rxe_dev *rxe, struct rxe_qp *qp,
> /* move the qp to the reset state */
> static void rxe_qp_reset(struct rxe_qp *qp)
> {
> - /* stop tasks from running */
> + /* flush workqueue and stop tasks from running */
> rxe_disable_task(&qp->resp.task);
>
> /* stop request/comp */
> diff --git a/drivers/infiniband/sw/rxe/rxe_req.c b/drivers/infiniband/sw/rxe/rxe_req.c
> index 899c8779f800..2bcd287a2c3b 100644
> --- a/drivers/infiniband/sw/rxe/rxe_req.c
> +++ b/drivers/infiniband/sw/rxe/rxe_req.c
> @@ -830,7 +830,7 @@ int rxe_requester(void *arg)
> update_state(qp, &pkt);
>
> /* A non-zero return value will cause rxe_do_task to
> - * exit its loop and end the tasklet. A zero return
> + * exit its loop and end the work item. A zero return
> * will continue looping and return to rxe_requester
> */
> done:
> diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
> index c74972244f08..d9134a00a529 100644
> --- a/drivers/infiniband/sw/rxe/rxe_resp.c
> +++ b/drivers/infiniband/sw/rxe/rxe_resp.c
> @@ -1691,7 +1691,7 @@ int rxe_responder(void *arg)
> }
>
> /* A non-zero return value will cause rxe_do_task to
> - * exit its loop and end the tasklet. A zero return
> + * exit its loop and end the work item. A zero return
> * will continue looping and return to rxe_responder
> */
> done:
> diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
> index 60b90e33a884..b96f72aa9005 100644
> --- a/drivers/infiniband/sw/rxe/rxe_task.c
> +++ b/drivers/infiniband/sw/rxe/rxe_task.c
> @@ -6,6 +6,22 @@
>
> #include "rxe.h"
>
> +static struct workqueue_struct *rxe_wq;
> +
> +int rxe_alloc_wq(void)
> +{
> + rxe_wq = alloc_workqueue("rxe_wq", WQ_CPU_INTENSIVE, WQ_MAX_ACTIVE);
> + if (!rxe_wq)
> + return -ENOMEM;
> +
> + return 0;
> +}
> +
> +void rxe_destroy_wq(void)
> +{
> + destroy_workqueue(rxe_wq);
> +}
> +
> int __rxe_do_task(struct rxe_task *task)
>
> {
> @@ -24,11 +40,11 @@ int __rxe_do_task(struct rxe_task *task)
> * a second caller finds the task already running
> * but looks just after the last call to func
> */
> -static void do_task(struct tasklet_struct *t)
> +static void do_task(struct work_struct *w)
> {
> int cont;
> int ret;
> - struct rxe_task *task = from_tasklet(task, t, tasklet);
> + struct rxe_task *task = container_of(w, typeof(*task), work);
> struct rxe_qp *qp = (struct rxe_qp *)task->arg;
> unsigned int iterations = RXE_MAX_ITERATIONS;
>
> @@ -64,10 +80,10 @@ static void do_task(struct tasklet_struct *t)
> } else if (iterations--) {
> cont = 1;
> } else {
> - /* reschedule the tasklet and exit
> + /* reschedule the work item and exit
> * the loop to give up the cpu
> */
> - tasklet_schedule(&task->tasklet);
> + queue_work(task->workq, &task->work);
> task->state = TASK_STATE_START;
> }
> break;
> @@ -97,7 +113,8 @@ int rxe_init_task(struct rxe_task *task, void *arg, int (*func)(void *))
> task->func = func;
> task->destroyed = false;
>
> - tasklet_setup(&task->tasklet, do_task);
> + INIT_WORK(&task->work, do_task);
> + task->workq = rxe_wq;
>
> task->state = TASK_STATE_START;
> spin_lock_init(&task->lock);
> @@ -111,17 +128,16 @@ void rxe_cleanup_task(struct rxe_task *task)
>
> /*
> * Mark the task, then wait for it to finish. It might be
> - * running in a non-tasklet (direct call) context.
> + * running in a non-workqueue (direct call) context.
> */
> task->destroyed = true;
> + flush_workqueue(task->workq);
>
> do {
> spin_lock_bh(&task->lock);
> idle = (task->state == TASK_STATE_START);
> spin_unlock_bh(&task->lock);
> } while (!idle);
> -
> - tasklet_kill(&task->tasklet);
> }
>
> void rxe_run_task(struct rxe_task *task)
> @@ -129,7 +145,7 @@ void rxe_run_task(struct rxe_task *task)
> if (task->destroyed)
> return;
>
> - do_task(&task->tasklet);
> + do_task(&task->work);
> }
>
> void rxe_sched_task(struct rxe_task *task)
> @@ -137,15 +153,27 @@ void rxe_sched_task(struct rxe_task *task)
> if (task->destroyed)
> return;
>
> - tasklet_schedule(&task->tasklet);
> + /*
> + * busy-loop while qp reset is in progress.
> + * This may be called from softirq context and thus cannot sleep.
> + */
> + while (atomic_read(&task->suspended))
> + cpu_relax();
> +
> + queue_work(task->workq, &task->work);
> }
>
> void rxe_disable_task(struct rxe_task *task)
> {
> - tasklet_disable(&task->tasklet);
> + /* Alternative to tasklet_disable() */
> + atomic_inc(&task->suspended);
> + smp_mb__after_atomic();
> + flush_workqueue(task->workq);
> }
>
> void rxe_enable_task(struct rxe_task *task)
> {
> - tasklet_enable(&task->tasklet);
> + /* Alternative to tasklet_enable() */
> + smp_mb__before_atomic();
> + atomic_dec(&task->suspended);
> }
> diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
> index 7b88129702ac..9aa3f236e886 100644
> --- a/drivers/infiniband/sw/rxe/rxe_task.h
> +++ b/drivers/infiniband/sw/rxe/rxe_task.h
> @@ -19,15 +19,22 @@ enum {
> * called again.
> */
> struct rxe_task {
> - struct tasklet_struct tasklet;
> + struct workqueue_struct *workq;
> + struct work_struct work;
> int state;
> spinlock_t lock;
> void *arg;
> int (*func)(void *arg);
> int ret;
> bool destroyed;
> + /* used to {dis, en}able per-qp work items */
> + atomic_t suspended;
> };
>
> +int rxe_alloc_wq(void);
> +
> +void rxe_destroy_wq(void);
> +
> /*
> * init rxe_task structure
> * arg => parameter to pass to fcn
> @@ -40,18 +47,20 @@ void rxe_cleanup_task(struct rxe_task *task);
>
> /*
> * raw call to func in loop without any checking
> - * can call when tasklets are disabled
> + * can call when tasks are suspended
> */
> int __rxe_do_task(struct rxe_task *task);
>
> +/* run a task without scheduling */
> void rxe_run_task(struct rxe_task *task);
>
> +/* schedule a task into workqueue */
> void rxe_sched_task(struct rxe_task *task);
>
> /* keep a task from scheduling */
> void rxe_disable_task(struct rxe_task *task);
>
> -/* allow task to run */
> +/* allow a task to run again */
> void rxe_enable_task(struct rxe_task *task);
>
> #endif /* RXE_TASK_H */

2022-12-29 02:10:54

by Zhu Yanjun

[permalink] [raw]
Subject: Re: [PATCH for-next v3 1/7] RDMA/rxe: Convert triple tasklets to use workqueue

On Thu, Dec 29, 2022 at 12:56 AM Bob Pearson <[email protected]> wrote:
>
> On 12/23/22 00:51, Daisuke Matsuda wrote:
> > In order to implement On-Demand Paging on the rxe driver, triple tasklets
> > (requester, responder, and completer) must be allowed to sleep so that they
> > can trigger page fault when pages being accessed are not present.
> >
> > This patch replaces the tasklets with a workqueue, but still allows direct-
> > call of works from softirq context if it is obvious that MRs are not going
> > to be accessed and there is no work being processed in the workqueue.
>
> There are already at least two patch sets that do this waiting to get upstream.

RXE accepted several patch sets just now. So it needs time to make
tests and check bugs.

Zhu Yanjun

> Bob
>
> >
> > As counterparts to tasklet_disable() and tasklet_enable() are missing for
> > workqueues, an atomic value is introduced to prevent work items from being
> > scheduled while qp reset is in progress.
> >
> > The way to initialize/destroy workqueue is picked up from the
> > implementation of Ian Ziemba and Bob Pearson at HPE.
> >
> > Link: https://lore.kernel.org/all/[email protected]/
> > Signed-off-by: Daisuke Matsuda <[email protected]>
> > ---
> > drivers/infiniband/sw/rxe/rxe.c | 9 ++++-
> > drivers/infiniband/sw/rxe/rxe_comp.c | 2 +-
> > drivers/infiniband/sw/rxe/rxe_param.h | 2 +-
> > drivers/infiniband/sw/rxe/rxe_qp.c | 2 +-
> > drivers/infiniband/sw/rxe/rxe_req.c | 2 +-
> > drivers/infiniband/sw/rxe/rxe_resp.c | 2 +-
> > drivers/infiniband/sw/rxe/rxe_task.c | 52 ++++++++++++++++++++-------
> > drivers/infiniband/sw/rxe/rxe_task.h | 15 ++++++--
> > 8 files changed, 65 insertions(+), 21 deletions(-)
> >
> > diff --git a/drivers/infiniband/sw/rxe/rxe.c b/drivers/infiniband/sw/rxe/rxe.c
> > index 136c2efe3466..3c7e42e5b0c7 100644
> > --- a/drivers/infiniband/sw/rxe/rxe.c
> > +++ b/drivers/infiniband/sw/rxe/rxe.c
> > @@ -210,10 +210,16 @@ static int __init rxe_module_init(void)
> > {
> > int err;
> >
> > - err = rxe_net_init();
> > + err = rxe_alloc_wq();
> > if (err)
> > return err;
> >
> > + err = rxe_net_init();
> > + if (err) {
> > + rxe_destroy_wq();
> > + return err;
> > + }
> > +
> > rdma_link_register(&rxe_link_ops);
> > pr_info("loaded\n");
> > return 0;
> > @@ -224,6 +230,7 @@ static void __exit rxe_module_exit(void)
> > rdma_link_unregister(&rxe_link_ops);
> > ib_unregister_driver(RDMA_DRIVER_RXE);
> > rxe_net_exit();
> > + rxe_destroy_wq();
> >
> > pr_info("unloaded\n");
> > }
> > diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
> > index 20737fec392b..046bbacce37c 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_comp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_comp.c
> > @@ -773,7 +773,7 @@ int rxe_completer(void *arg)
> > }
> >
> > /* A non-zero return value will cause rxe_do_task to
> > - * exit its loop and end the tasklet. A zero return
> > + * exit its loop and end the work item. A zero return
> > * will continue looping and return to rxe_completer
> > */
> > done:
> > diff --git a/drivers/infiniband/sw/rxe/rxe_param.h b/drivers/infiniband/sw/rxe/rxe_param.h
> > index a754fc902e3d..bd8050e99d6b 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_param.h
> > +++ b/drivers/infiniband/sw/rxe/rxe_param.h
> > @@ -112,7 +112,7 @@ enum rxe_device_param {
> > RXE_INFLIGHT_SKBS_PER_QP_HIGH = 64,
> > RXE_INFLIGHT_SKBS_PER_QP_LOW = 16,
> >
> > - /* Max number of interations of each tasklet
> > + /* Max number of interations of each work item
> > * before yielding the cpu to let other
> > * work make progress
> > */
> > diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
> > index ab72db68b58f..e033b2449dfe 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_qp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_qp.c
> > @@ -471,7 +471,7 @@ int rxe_qp_chk_attr(struct rxe_dev *rxe, struct rxe_qp *qp,
> > /* move the qp to the reset state */
> > static void rxe_qp_reset(struct rxe_qp *qp)
> > {
> > - /* stop tasks from running */
> > + /* flush workqueue and stop tasks from running */
> > rxe_disable_task(&qp->resp.task);
> >
> > /* stop request/comp */
> > diff --git a/drivers/infiniband/sw/rxe/rxe_req.c b/drivers/infiniband/sw/rxe/rxe_req.c
> > index 899c8779f800..2bcd287a2c3b 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_req.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_req.c
> > @@ -830,7 +830,7 @@ int rxe_requester(void *arg)
> > update_state(qp, &pkt);
> >
> > /* A non-zero return value will cause rxe_do_task to
> > - * exit its loop and end the tasklet. A zero return
> > + * exit its loop and end the work item. A zero return
> > * will continue looping and return to rxe_requester
> > */
> > done:
> > diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
> > index c74972244f08..d9134a00a529 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_resp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_resp.c
> > @@ -1691,7 +1691,7 @@ int rxe_responder(void *arg)
> > }
> >
> > /* A non-zero return value will cause rxe_do_task to
> > - * exit its loop and end the tasklet. A zero return
> > + * exit its loop and end the work item. A zero return
> > * will continue looping and return to rxe_responder
> > */
> > done:
> > diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
> > index 60b90e33a884..b96f72aa9005 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_task.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_task.c
> > @@ -6,6 +6,22 @@
> >
> > #include "rxe.h"
> >
> > +static struct workqueue_struct *rxe_wq;
> > +
> > +int rxe_alloc_wq(void)
> > +{
> > + rxe_wq = alloc_workqueue("rxe_wq", WQ_CPU_INTENSIVE, WQ_MAX_ACTIVE);
> > + if (!rxe_wq)
> > + return -ENOMEM;
> > +
> > + return 0;
> > +}
> > +
> > +void rxe_destroy_wq(void)
> > +{
> > + destroy_workqueue(rxe_wq);
> > +}
> > +
> > int __rxe_do_task(struct rxe_task *task)
> >
> > {
> > @@ -24,11 +40,11 @@ int __rxe_do_task(struct rxe_task *task)
> > * a second caller finds the task already running
> > * but looks just after the last call to func
> > */
> > -static void do_task(struct tasklet_struct *t)
> > +static void do_task(struct work_struct *w)
> > {
> > int cont;
> > int ret;
> > - struct rxe_task *task = from_tasklet(task, t, tasklet);
> > + struct rxe_task *task = container_of(w, typeof(*task), work);
> > struct rxe_qp *qp = (struct rxe_qp *)task->arg;
> > unsigned int iterations = RXE_MAX_ITERATIONS;
> >
> > @@ -64,10 +80,10 @@ static void do_task(struct tasklet_struct *t)
> > } else if (iterations--) {
> > cont = 1;
> > } else {
> > - /* reschedule the tasklet and exit
> > + /* reschedule the work item and exit
> > * the loop to give up the cpu
> > */
> > - tasklet_schedule(&task->tasklet);
> > + queue_work(task->workq, &task->work);
> > task->state = TASK_STATE_START;
> > }
> > break;
> > @@ -97,7 +113,8 @@ int rxe_init_task(struct rxe_task *task, void *arg, int (*func)(void *))
> > task->func = func;
> > task->destroyed = false;
> >
> > - tasklet_setup(&task->tasklet, do_task);
> > + INIT_WORK(&task->work, do_task);
> > + task->workq = rxe_wq;
> >
> > task->state = TASK_STATE_START;
> > spin_lock_init(&task->lock);
> > @@ -111,17 +128,16 @@ void rxe_cleanup_task(struct rxe_task *task)
> >
> > /*
> > * Mark the task, then wait for it to finish. It might be
> > - * running in a non-tasklet (direct call) context.
> > + * running in a non-workqueue (direct call) context.
> > */
> > task->destroyed = true;
> > + flush_workqueue(task->workq);
> >
> > do {
> > spin_lock_bh(&task->lock);
> > idle = (task->state == TASK_STATE_START);
> > spin_unlock_bh(&task->lock);
> > } while (!idle);
> > -
> > - tasklet_kill(&task->tasklet);
> > }
> >
> > void rxe_run_task(struct rxe_task *task)
> > @@ -129,7 +145,7 @@ void rxe_run_task(struct rxe_task *task)
> > if (task->destroyed)
> > return;
> >
> > - do_task(&task->tasklet);
> > + do_task(&task->work);
> > }
> >
> > void rxe_sched_task(struct rxe_task *task)
> > @@ -137,15 +153,27 @@ void rxe_sched_task(struct rxe_task *task)
> > if (task->destroyed)
> > return;
> >
> > - tasklet_schedule(&task->tasklet);
> > + /*
> > + * busy-loop while qp reset is in progress.
> > + * This may be called from softirq context and thus cannot sleep.
> > + */
> > + while (atomic_read(&task->suspended))
> > + cpu_relax();
> > +
> > + queue_work(task->workq, &task->work);
> > }
> >
> > void rxe_disable_task(struct rxe_task *task)
> > {
> > - tasklet_disable(&task->tasklet);
> > + /* Alternative to tasklet_disable() */
> > + atomic_inc(&task->suspended);
> > + smp_mb__after_atomic();
> > + flush_workqueue(task->workq);
> > }
> >
> > void rxe_enable_task(struct rxe_task *task)
> > {
> > - tasklet_enable(&task->tasklet);
> > + /* Alternative to tasklet_enable() */
> > + smp_mb__before_atomic();
> > + atomic_dec(&task->suspended);
> > }
> > diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
> > index 7b88129702ac..9aa3f236e886 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_task.h
> > +++ b/drivers/infiniband/sw/rxe/rxe_task.h
> > @@ -19,15 +19,22 @@ enum {
> > * called again.
> > */
> > struct rxe_task {
> > - struct tasklet_struct tasklet;
> > + struct workqueue_struct *workq;
> > + struct work_struct work;
> > int state;
> > spinlock_t lock;
> > void *arg;
> > int (*func)(void *arg);
> > int ret;
> > bool destroyed;
> > + /* used to {dis, en}able per-qp work items */
> > + atomic_t suspended;
> > };
> >
> > +int rxe_alloc_wq(void);
> > +
> > +void rxe_destroy_wq(void);
> > +
> > /*
> > * init rxe_task structure
> > * arg => parameter to pass to fcn
> > @@ -40,18 +47,20 @@ void rxe_cleanup_task(struct rxe_task *task);
> >
> > /*
> > * raw call to func in loop without any checking
> > - * can call when tasklets are disabled
> > + * can call when tasks are suspended
> > */
> > int __rxe_do_task(struct rxe_task *task);
> >
> > +/* run a task without scheduling */
> > void rxe_run_task(struct rxe_task *task);
> >
> > +/* schedule a task into workqueue */
> > void rxe_sched_task(struct rxe_task *task);
> >
> > /* keep a task from scheduling */
> > void rxe_disable_task(struct rxe_task *task);
> >
> > -/* allow task to run */
> > +/* allow a task to run again */
> > void rxe_enable_task(struct rxe_task *task);
> >
> > #endif /* RXE_TASK_H */
>

2022-12-29 07:32:35

by Leon Romanovsky

[permalink] [raw]
Subject: Re: [PATCH for-next v3 1/7] RDMA/rxe: Convert triple tasklets to use workqueue

On Wed, Dec 28, 2022 at 10:56:11AM -0600, Bob Pearson wrote:
> On 12/23/22 00:51, Daisuke Matsuda wrote:
> > In order to implement On-Demand Paging on the rxe driver, triple tasklets
> > (requester, responder, and completer) must be allowed to sleep so that they
> > can trigger page fault when pages being accessed are not present.
> >
> > This patch replaces the tasklets with a workqueue, but still allows direct-
> > call of works from softirq context if it is obvious that MRs are not going
> > to be accessed and there is no work being processed in the workqueue.
>
> There are already at least two patch sets that do this waiting to get upstream

I don't see any unhandled RXE series, except of this one patch [1],
which is one out larger series.

[1] https://patchwork.kernel.org/project/linux-rdma/patch/[email protected]/

> Bob

Subject: RE: [PATCH for-next v3 1/7] RDMA/rxe: Convert triple tasklets to use workqueue

On Thu, Dec 29, 2022 1:56 AM Bob Pearson wrote:
>
> On 12/23/22 00:51, Daisuke Matsuda wrote:
> > In order to implement On-Demand Paging on the rxe driver, triple tasklets
> > (requester, responder, and completer) must be allowed to sleep so that they
> > can trigger page fault when pages being accessed are not present.
> >
> > This patch replaces the tasklets with a workqueue, but still allows direct-
> > call of works from softirq context if it is obvious that MRs are not going
> > to be accessed and there is no work being processed in the workqueue.
>
> There are already at least two patch sets that do this waiting to get upstream.
> Bob

I wrote my intention at the first part of the cover letter.
Cf. https://lore.kernel.org/lkml/[email protected]/

Your patch set introduces a soft lockup issue. It would take much more time to find
the root cause than to simply convert the tasklets to a workqueue with this patch.
My ODP patches have been stuck for almost 4 months because of this issue, and I am
not willing to wait any longer.

Daisuke

>
> >
> > As counterparts to tasklet_disable() and tasklet_enable() are missing for
> > workqueues, an atomic value is introduced to prevent work items from being
> > scheduled while qp reset is in progress.
> >
> > The way to initialize/destroy workqueue is picked up from the
> > implementation of Ian Ziemba and Bob Pearson at HPE.
> >
> > Link: https://lore.kernel.org/all/[email protected]/
> > Signed-off-by: Daisuke Matsuda <[email protected]>
> > ---
> > drivers/infiniband/sw/rxe/rxe.c | 9 ++++-
> > drivers/infiniband/sw/rxe/rxe_comp.c | 2 +-
> > drivers/infiniband/sw/rxe/rxe_param.h | 2 +-
> > drivers/infiniband/sw/rxe/rxe_qp.c | 2 +-
> > drivers/infiniband/sw/rxe/rxe_req.c | 2 +-
> > drivers/infiniband/sw/rxe/rxe_resp.c | 2 +-
> > drivers/infiniband/sw/rxe/rxe_task.c | 52 ++++++++++++++++++++-------
> > drivers/infiniband/sw/rxe/rxe_task.h | 15 ++++++--
> > 8 files changed, 65 insertions(+), 21 deletions(-)
> >
> > diff --git a/drivers/infiniband/sw/rxe/rxe.c b/drivers/infiniband/sw/rxe/rxe.c
> > index 136c2efe3466..3c7e42e5b0c7 100644
> > --- a/drivers/infiniband/sw/rxe/rxe.c
> > +++ b/drivers/infiniband/sw/rxe/rxe.c
> > @@ -210,10 +210,16 @@ static int __init rxe_module_init(void)
> > {
> > int err;
> >
> > - err = rxe_net_init();
> > + err = rxe_alloc_wq();
> > if (err)
> > return err;
> >
> > + err = rxe_net_init();
> > + if (err) {
> > + rxe_destroy_wq();
> > + return err;
> > + }
> > +
> > rdma_link_register(&rxe_link_ops);
> > pr_info("loaded\n");
> > return 0;
> > @@ -224,6 +230,7 @@ static void __exit rxe_module_exit(void)
> > rdma_link_unregister(&rxe_link_ops);
> > ib_unregister_driver(RDMA_DRIVER_RXE);
> > rxe_net_exit();
> > + rxe_destroy_wq();
> >
> > pr_info("unloaded\n");
> > }
> > diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
> > index 20737fec392b..046bbacce37c 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_comp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_comp.c
> > @@ -773,7 +773,7 @@ int rxe_completer(void *arg)
> > }
> >
> > /* A non-zero return value will cause rxe_do_task to
> > - * exit its loop and end the tasklet. A zero return
> > + * exit its loop and end the work item. A zero return
> > * will continue looping and return to rxe_completer
> > */
> > done:
> > diff --git a/drivers/infiniband/sw/rxe/rxe_param.h b/drivers/infiniband/sw/rxe/rxe_param.h
> > index a754fc902e3d..bd8050e99d6b 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_param.h
> > +++ b/drivers/infiniband/sw/rxe/rxe_param.h
> > @@ -112,7 +112,7 @@ enum rxe_device_param {
> > RXE_INFLIGHT_SKBS_PER_QP_HIGH = 64,
> > RXE_INFLIGHT_SKBS_PER_QP_LOW = 16,
> >
> > - /* Max number of interations of each tasklet
> > + /* Max number of interations of each work item
> > * before yielding the cpu to let other
> > * work make progress
> > */
> > diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
> > index ab72db68b58f..e033b2449dfe 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_qp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_qp.c
> > @@ -471,7 +471,7 @@ int rxe_qp_chk_attr(struct rxe_dev *rxe, struct rxe_qp *qp,
> > /* move the qp to the reset state */
> > static void rxe_qp_reset(struct rxe_qp *qp)
> > {
> > - /* stop tasks from running */
> > + /* flush workqueue and stop tasks from running */
> > rxe_disable_task(&qp->resp.task);
> >
> > /* stop request/comp */
> > diff --git a/drivers/infiniband/sw/rxe/rxe_req.c b/drivers/infiniband/sw/rxe/rxe_req.c
> > index 899c8779f800..2bcd287a2c3b 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_req.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_req.c
> > @@ -830,7 +830,7 @@ int rxe_requester(void *arg)
> > update_state(qp, &pkt);
> >
> > /* A non-zero return value will cause rxe_do_task to
> > - * exit its loop and end the tasklet. A zero return
> > + * exit its loop and end the work item. A zero return
> > * will continue looping and return to rxe_requester
> > */
> > done:
> > diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
> > index c74972244f08..d9134a00a529 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_resp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_resp.c
> > @@ -1691,7 +1691,7 @@ int rxe_responder(void *arg)
> > }
> >
> > /* A non-zero return value will cause rxe_do_task to
> > - * exit its loop and end the tasklet. A zero return
> > + * exit its loop and end the work item. A zero return
> > * will continue looping and return to rxe_responder
> > */
> > done:
> > diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
> > index 60b90e33a884..b96f72aa9005 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_task.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_task.c
> > @@ -6,6 +6,22 @@
> >
> > #include "rxe.h"
> >
> > +static struct workqueue_struct *rxe_wq;
> > +
> > +int rxe_alloc_wq(void)
> > +{
> > + rxe_wq = alloc_workqueue("rxe_wq", WQ_CPU_INTENSIVE, WQ_MAX_ACTIVE);
> > + if (!rxe_wq)
> > + return -ENOMEM;
> > +
> > + return 0;
> > +}
> > +
> > +void rxe_destroy_wq(void)
> > +{
> > + destroy_workqueue(rxe_wq);
> > +}
> > +
> > int __rxe_do_task(struct rxe_task *task)
> >
> > {
> > @@ -24,11 +40,11 @@ int __rxe_do_task(struct rxe_task *task)
> > * a second caller finds the task already running
> > * but looks just after the last call to func
> > */
> > -static void do_task(struct tasklet_struct *t)
> > +static void do_task(struct work_struct *w)
> > {
> > int cont;
> > int ret;
> > - struct rxe_task *task = from_tasklet(task, t, tasklet);
> > + struct rxe_task *task = container_of(w, typeof(*task), work);
> > struct rxe_qp *qp = (struct rxe_qp *)task->arg;
> > unsigned int iterations = RXE_MAX_ITERATIONS;
> >
> > @@ -64,10 +80,10 @@ static void do_task(struct tasklet_struct *t)
> > } else if (iterations--) {
> > cont = 1;
> > } else {
> > - /* reschedule the tasklet and exit
> > + /* reschedule the work item and exit
> > * the loop to give up the cpu
> > */
> > - tasklet_schedule(&task->tasklet);
> > + queue_work(task->workq, &task->work);
> > task->state = TASK_STATE_START;
> > }
> > break;
> > @@ -97,7 +113,8 @@ int rxe_init_task(struct rxe_task *task, void *arg, int (*func)(void *))
> > task->func = func;
> > task->destroyed = false;
> >
> > - tasklet_setup(&task->tasklet, do_task);
> > + INIT_WORK(&task->work, do_task);
> > + task->workq = rxe_wq;
> >
> > task->state = TASK_STATE_START;
> > spin_lock_init(&task->lock);
> > @@ -111,17 +128,16 @@ void rxe_cleanup_task(struct rxe_task *task)
> >
> > /*
> > * Mark the task, then wait for it to finish. It might be
> > - * running in a non-tasklet (direct call) context.
> > + * running in a non-workqueue (direct call) context.
> > */
> > task->destroyed = true;
> > + flush_workqueue(task->workq);
> >
> > do {
> > spin_lock_bh(&task->lock);
> > idle = (task->state == TASK_STATE_START);
> > spin_unlock_bh(&task->lock);
> > } while (!idle);
> > -
> > - tasklet_kill(&task->tasklet);
> > }
> >
> > void rxe_run_task(struct rxe_task *task)
> > @@ -129,7 +145,7 @@ void rxe_run_task(struct rxe_task *task)
> > if (task->destroyed)
> > return;
> >
> > - do_task(&task->tasklet);
> > + do_task(&task->work);
> > }
> >
> > void rxe_sched_task(struct rxe_task *task)
> > @@ -137,15 +153,27 @@ void rxe_sched_task(struct rxe_task *task)
> > if (task->destroyed)
> > return;
> >
> > - tasklet_schedule(&task->tasklet);
> > + /*
> > + * busy-loop while qp reset is in progress.
> > + * This may be called from softirq context and thus cannot sleep.
> > + */
> > + while (atomic_read(&task->suspended))
> > + cpu_relax();
> > +
> > + queue_work(task->workq, &task->work);
> > }
> >
> > void rxe_disable_task(struct rxe_task *task)
> > {
> > - tasklet_disable(&task->tasklet);
> > + /* Alternative to tasklet_disable() */
> > + atomic_inc(&task->suspended);
> > + smp_mb__after_atomic();
> > + flush_workqueue(task->workq);
> > }
> >
> > void rxe_enable_task(struct rxe_task *task)
> > {
> > - tasklet_enable(&task->tasklet);
> > + /* Alternative to tasklet_enable() */
> > + smp_mb__before_atomic();
> > + atomic_dec(&task->suspended);
> > }
> > diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
> > index 7b88129702ac..9aa3f236e886 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_task.h
> > +++ b/drivers/infiniband/sw/rxe/rxe_task.h
> > @@ -19,15 +19,22 @@ enum {
> > * called again.
> > */
> > struct rxe_task {
> > - struct tasklet_struct tasklet;
> > + struct workqueue_struct *workq;
> > + struct work_struct work;
> > int state;
> > spinlock_t lock;
> > void *arg;
> > int (*func)(void *arg);
> > int ret;
> > bool destroyed;
> > + /* used to {dis, en}able per-qp work items */
> > + atomic_t suspended;
> > };
> >
> > +int rxe_alloc_wq(void);
> > +
> > +void rxe_destroy_wq(void);
> > +
> > /*
> > * init rxe_task structure
> > * arg => parameter to pass to fcn
> > @@ -40,18 +47,20 @@ void rxe_cleanup_task(struct rxe_task *task);
> >
> > /*
> > * raw call to func in loop without any checking
> > - * can call when tasklets are disabled
> > + * can call when tasks are suspended
> > */
> > int __rxe_do_task(struct rxe_task *task);
> >
> > +/* run a task without scheduling */
> > void rxe_run_task(struct rxe_task *task);
> >
> > +/* schedule a task into workqueue */
> > void rxe_sched_task(struct rxe_task *task);
> >
> > /* keep a task from scheduling */
> > void rxe_disable_task(struct rxe_task *task);
> >
> > -/* allow task to run */
> > +/* allow a task to run again */
> > void rxe_enable_task(struct rxe_task *task);
> >
> > #endif /* RXE_TASK_H */

2023-01-16 18:53:19

by Jason Gunthorpe

[permalink] [raw]
Subject: Re: [PATCH for-next v3 4/7] RDMA/rxe: Add page invalidation support

On Fri, Dec 23, 2022 at 03:51:55PM +0900, Daisuke Matsuda wrote:

> +static bool rxe_ib_invalidate_range(struct mmu_interval_notifier *mni,
> + const struct mmu_notifier_range *range,
> + unsigned long cur_seq)
> +{
> + struct ib_umem_odp *umem_odp =
> + container_of(mni, struct ib_umem_odp, notifier);
> + unsigned long start;
> + unsigned long end;
> +
> + if (!mmu_notifier_range_blockable(range))
> + return false;
> +
> + mutex_lock(&umem_odp->umem_mutex);
> + mmu_interval_set_seq(mni, cur_seq);
> +
> + start = max_t(u64, ib_umem_start(umem_odp), range->start);
> + end = min_t(u64, ib_umem_end(umem_odp), range->end);
> +
> + ib_umem_odp_unmap_dma_pages(umem_odp, start, end);

After bob's xarray conversion this can be done alot faster, it just an
xa_for_each_range and make the xarray items non-present

non-present is probably just a null struct page in the xarray.

Jason

2023-01-16 19:08:14

by Jason Gunthorpe

[permalink] [raw]
Subject: Re: [PATCH for-next v3 3/7] RDMA/rxe: Cleanup code for responder Atomic operations

On Fri, Dec 23, 2022 at 03:51:54PM +0900, Daisuke Matsuda wrote:
> @@ -733,60 +734,83 @@ static enum resp_states process_flush(struct rxe_qp *qp,
> /* Guarantee atomicity of atomic operations at the machine level. */
> static DEFINE_SPINLOCK(atomic_ops_lock);
>
> -static enum resp_states atomic_reply(struct rxe_qp *qp,
> - struct rxe_pkt_info *pkt)
> +enum resp_states rxe_process_atomic(struct rxe_qp *qp,
> + struct rxe_pkt_info *pkt, u64 *vaddr)
> {
> - u64 *vaddr;
> enum resp_states ret;
> - struct rxe_mr *mr = qp->resp.mr;
> struct resp_res *res = qp->resp.res;
> u64 value;
>
> - if (!res) {
> - res = rxe_prepare_res(qp, pkt, RXE_ATOMIC_MASK);
> - qp->resp.res = res;
> + /* check vaddr is 8 bytes aligned. */
> + if (!vaddr || (uintptr_t)vaddr & 7) {
> + ret = RESPST_ERR_MISALIGNED_ATOMIC;
> + goto out;
> }
>
> - if (!res->replay) {
> - if (mr->state != RXE_MR_STATE_VALID) {
> - ret = RESPST_ERR_RKEY_VIOLATION;
> - goto out;
> - }
> + spin_lock(&atomic_ops_lock);
> + res->atomic.orig_val = value = *vaddr;
>
> - vaddr = iova_to_vaddr(mr, qp->resp.va + qp->resp.offset,
> - sizeof(u64));

I think you need to properly fix the lifetime problem with iova_to_vaddr
function, not hack around it like this.

iova_to_vaddr should be able to return an IOVA for ODP just fine - the
reason it can't is the same bug it has with normal MRs, the mapping
can just change under the feet and there is no protective locking.

If you are going to follow the same ODP design as mlx5 then
fundamentally all ODP does to the MR is add a not-present bit and
allow the MR pages to churn rapidly.

Make the MR safe to changes in the page references against races and
ODP will work just fine.

This will be easier on top of Bob's xarray patch, please check what he
has there and test it.

Thanks,
Jason