2023-08-04 13:50:51

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 00/18] ceph, rbd: Collapse all the I/O types down to something iov_iter-based

Hi Ilya, Xiubo,

[!] NOTE: This is a preview of a work in progress and doesn't yet fully
compile, let alone actually work!

Here are some patches that (mostly) collapse the different I/O types
(PAGES, PAGELIST, BVECS, BIO) down to a single one. I added a new type,
ceph_databuf, to make this easier. The page list is attached to that as a
bio_vec[] with an iov_iter, but could also be some other type supported by
the iov_iter. The iov_iter defines the data or buffer to be used. I have
an additional iov_iter type implemented that allows use of a straight
folio[] or page[] instead of a bio_vec[] that I can deploy if that proves
more useful.

The conversion isn't quite complete:

(1) rbd is done; BVECS and BIO types are replaced with ceph_databuf.

(2) ceph_osd_linger_request::preply_pages needs switching over to a
ceph_databuf, but I haven't yet managed to work out how the pages that
handle_watch_notify() sticks in there come about.

(3) I haven't altered data transmission in net/ceph/messenger*.c yet. The
aim is to reduce it to a single sendmsg() call for each ceph_msg_data
struct, using the iov_iter therein.

(4) The data reception routines in net/ceph/messenger*.c also need
modifying to pass each ceph_msg_data::iter to recvmsg() in turn.

(5) It might be possible to merge struct ceph_databuf into struct
ceph_msg_data and eliminate the former.

(6) fs/ceph/ still needs some work to clean up the use of page arrays.

(7) I would like to change front and middle buffers with a ceph_databuf,
vmapping them when we need to access them.

I added a kmap_ceph_databuf_page() macro and used that to get a page and
use kmap_local_page() on it to hide the bvec[] inside to make it easier to
replace.

Anyway, if anyone has any thoughts...


I've pushed the patches here also:

https://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs.git/log/?h=iov-extract

David

David Howells (18):
iov_iter: Add function to see if buffer is all zeros
ceph: Rename alignment to offset
ceph: Add a new data container type, ceph_databuf
ceph: Convert ceph_mds_request::r_pagelist to a databuf
rbd: Use ceph_databuf for rbd_obj_read_sync()
ceph: Change ceph_osdc_call()'s reply to a ceph_databuf
ceph: Unexport osd_req_op_cls_request_data_pages()
ceph: Remove osd_req_op_cls_response_data_pages()
ceph: Convert notify_id_pages to a ceph_databuf
rbd: Switch from using bvec_iter to iov_iter
ceph: Remove bvec and bio data container types
ceph: Convert some page arrays to ceph_databuf
ceph: Convert users of ceph_pagelist to ceph_databuf
ceph: Remove ceph_pagelist
ceph: Convert ceph_osdc_notify() reply to ceph_databuf
ceph: Remove CEPH_OS_DATA_TYPE_PAGES and its attendant helpers
ceph: Remove CEPH_MSG_DATA_PAGES and its helpers
ceph: Don't use data_pages

drivers/block/rbd.c | 645 ++++++++++----------------------
fs/ceph/acl.c | 39 +-
fs/ceph/addr.c | 18 +-
fs/ceph/file.c | 157 ++++----
fs/ceph/inode.c | 85 ++---
fs/ceph/locks.c | 23 +-
fs/ceph/mds_client.c | 134 ++++---
fs/ceph/mds_client.h | 2 +-
fs/ceph/super.h | 8 +-
fs/ceph/xattr.c | 68 ++--
include/linux/ceph/databuf.h | 65 ++++
include/linux/ceph/messenger.h | 141 +------
include/linux/ceph/osd_client.h | 97 ++---
include/linux/ceph/pagelist.h | 72 ----
include/linux/uio.h | 1 +
lib/iov_iter.c | 22 ++
net/ceph/Makefile | 5 +-
net/ceph/cls_lock_client.c | 40 +-
net/ceph/databuf.c | 149 ++++++++
net/ceph/messenger.c | 376 +------------------
net/ceph/osd_client.c | 430 +++++++--------------
net/ceph/pagelist.c | 171 ---------
22 files changed, 876 insertions(+), 1872 deletions(-)
create mode 100644 include/linux/ceph/databuf.h
delete mode 100644 include/linux/ceph/pagelist.h
create mode 100644 net/ceph/databuf.c
delete mode 100644 net/ceph/pagelist.c



2023-08-04 13:50:59

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 05/18] rbd: Use ceph_databuf for rbd_obj_read_sync()

Supply a ceph_databuf to rbd_obj_read_sync() to convey the data.

Signed-off-by: David Howells <[email protected]>
---
drivers/block/rbd.c | 45 ++++++++++++++++++++-------------------------
1 file changed, 20 insertions(+), 25 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 08d0908d0583..2a161b03dd7a 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -4762,13 +4762,10 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
- void *buf, int buf_len)
-
+ struct ceph_databuf *dbuf, int len)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct ceph_osd_request *req;
- struct page **pages;
- int num_pages = calc_pages_for(0, buf_len);
int ret;

req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
@@ -4779,15 +4776,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = CEPH_OSD_FLAG_READ;

- pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
- if (IS_ERR(pages)) {
- ret = PTR_ERR(pages);
- goto out_req;
- }
-
- osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
- osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
- true);
+ osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, len, 0, 0);
+ osd_req_op_extent_osd_databuf(req, 0, dbuf);

ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
if (ret)
@@ -4795,9 +4785,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,

ceph_osdc_start_request(osdc, req);
ret = ceph_osdc_wait_request(osdc, req);
- if (ret >= 0)
- ceph_copy_from_page_vector(pages, buf, 0, ret);
-
out_req:
ceph_osdc_put_request(req);
return ret;
@@ -4810,12 +4797,18 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
*/
static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
{
- struct rbd_image_header_ondisk *ondisk = NULL;
+ struct rbd_image_header_ondisk *ondisk;
+ struct ceph_databuf *dbuf = NULL;
u32 snap_count = 0;
u64 names_size = 0;
u32 want_count;
int ret;

+ dbuf = ceph_databuf_alloc(1, sizeof(*ondisk), GFP_KERNEL);
+ if (!dbuf)
+ return -ENOMEM;
+ ondisk = kmap_ceph_databuf_page(dbuf, 0);
+
/*
* The complete header will include an array of its 64-bit
* snapshot ids, followed by the names of those snapshots as
@@ -4826,17 +4819,18 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
do {
size_t size;

- kfree(ondisk);
-
size = sizeof (*ondisk);
size += snap_count * sizeof (struct rbd_image_snap_ondisk);
size += names_size;
- ondisk = kmalloc(size, GFP_KERNEL);
- if (!ondisk)
- return -ENOMEM;
+
+ ret = -ENOMEM;
+ if (size > dbuf->limit &&
+ ceph_databuf_reserve(dbuf, size - dbuf->limit,
+ GFP_KERNEL) < 0)
+ goto out;

ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
- &rbd_dev->header_oloc, ondisk, size);
+ &rbd_dev->header_oloc, dbuf, size);
if (ret < 0)
goto out;
if ((size_t)ret < size) {
@@ -4845,6 +4839,7 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
size, ret);
goto out;
}
+
if (!rbd_dev_ondisk_valid(ondisk)) {
ret = -ENXIO;
rbd_warn(rbd_dev, "invalid header");
@@ -4858,8 +4853,8 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)

ret = rbd_header_from_disk(rbd_dev, ondisk);
out:
- kfree(ondisk);
-
+ kunmap_local(ondisk);
+ ceph_databuf_release(dbuf);
return ret;
}



2023-08-04 13:52:49

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 10/18] rbd: Switch from using bvec_iter to iov_iter

---
drivers/block/rbd.c | 421 +++++++++-----------------------------------
fs/ceph/file.c | 111 +++++-------
2 files changed, 127 insertions(+), 405 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 971fa4a581cf..1756973b696f 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -214,13 +214,6 @@ struct pending_result {

struct rbd_img_request;

-enum obj_request_type {
- OBJ_REQUEST_NODATA = 1,
- OBJ_REQUEST_BIO, /* pointer into provided bio (list) */
- OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */
- OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */
-};
-
enum obj_operation_type {
OBJ_OP_READ = 1,
OBJ_OP_WRITE,
@@ -295,18 +288,12 @@ struct rbd_obj_request {
struct ceph_file_extent *img_extents;
u32 num_img_extents;

- union {
- struct ceph_bio_iter bio_pos;
- struct {
- struct ceph_bvec_iter bvec_pos;
- u32 bvec_count;
- u32 bvec_idx;
- };
- };
+ struct bio *bio;
+ struct bio_vec *bvec;
+ struct iov_iter iter;

enum rbd_obj_copyup_state copyup_state;
- struct bio_vec *copyup_bvecs;
- u32 copyup_bvec_count;
+ struct ceph_databuf *copyup_buf;

struct list_head osd_reqs; /* w/ r_private_item */

@@ -329,8 +316,8 @@ enum rbd_img_state {

struct rbd_img_request {
struct rbd_device *rbd_dev;
+ bool need_free_bvecs;
enum obj_operation_type op_type;
- enum obj_request_type data_type;
unsigned long flags;
enum rbd_img_state state;
union {
@@ -1218,26 +1205,6 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
rbd_dev->mapping.size = 0;
}

-static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
-{
- struct ceph_bio_iter it = *bio_pos;
-
- ceph_bio_iter_advance(&it, off);
- ceph_bio_iter_advance_step(&it, bytes, ({
- memzero_bvec(&bv);
- }));
-}
-
-static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
-{
- struct ceph_bvec_iter it = *bvec_pos;
-
- ceph_bvec_iter_advance(&it, off);
- ceph_bvec_iter_advance_step(&it, bytes, ({
- memzero_bvec(&bv);
- }));
-}
-
/*
* Zero a range in @obj_req data buffer defined by a bio (list) or
* (private) bio_vec array.
@@ -1249,17 +1216,9 @@ static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
{
dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);

- switch (obj_req->img_request->data_type) {
- case OBJ_REQUEST_BIO:
- zero_bios(&obj_req->bio_pos, off, bytes);
- break;
- case OBJ_REQUEST_BVECS:
- case OBJ_REQUEST_OWN_BVECS:
- zero_bvecs(&obj_req->bvec_pos, off, bytes);
- break;
- default:
- BUG();
- }
+ iov_iter_advance(&obj_req->iter, off);
+ iov_iter_zero(bytes, &obj_req->iter);
+ iov_iter_revert(&obj_req->iter, off);
}

static void rbd_obj_request_destroy(struct kref *kref);
@@ -1484,7 +1443,6 @@ static void rbd_obj_request_destroy(struct kref *kref)
{
struct rbd_obj_request *obj_request;
struct ceph_osd_request *osd_req;
- u32 i;

obj_request = container_of(kref, struct rbd_obj_request, kref);

@@ -1497,27 +1455,10 @@ static void rbd_obj_request_destroy(struct kref *kref)
ceph_osdc_put_request(osd_req);
}

- switch (obj_request->img_request->data_type) {
- case OBJ_REQUEST_NODATA:
- case OBJ_REQUEST_BIO:
- case OBJ_REQUEST_BVECS:
- break; /* Nothing to do */
- case OBJ_REQUEST_OWN_BVECS:
- kfree(obj_request->bvec_pos.bvecs);
- break;
- default:
- BUG();
- }
-
+ if (obj_request->img_request->need_free_bvecs)
+ kfree(obj_request->bvec);
kfree(obj_request->img_extents);
- if (obj_request->copyup_bvecs) {
- for (i = 0; i < obj_request->copyup_bvec_count; i++) {
- if (obj_request->copyup_bvecs[i].bv_page)
- __free_page(obj_request->copyup_bvecs[i].bv_page);
- }
- kfree(obj_request->copyup_bvecs);
- }
-
+ ceph_databuf_release(obj_request->copyup_buf);
kmem_cache_free(rbd_obj_request_cache, obj_request);
}

@@ -2165,29 +2106,6 @@ static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
return 0;
}

-static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
-{
- struct rbd_obj_request *obj_req = osd_req->r_priv;
-
- switch (obj_req->img_request->data_type) {
- case OBJ_REQUEST_BIO:
- osd_req_op_extent_osd_data_bio(osd_req, which,
- &obj_req->bio_pos,
- obj_req->ex.oe_len);
- break;
- case OBJ_REQUEST_BVECS:
- case OBJ_REQUEST_OWN_BVECS:
- rbd_assert(obj_req->bvec_pos.iter.bi_size ==
- obj_req->ex.oe_len);
- rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
- osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
- &obj_req->bvec_pos);
- break;
- default:
- BUG();
- }
-}
-
static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
{
struct page **pages;
@@ -2221,8 +2139,7 @@ static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
if (ret)
return ret;

- osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
- obj_req->copyup_bvec_count, bytes);
+ osd_req_op_cls_request_databuf(osd_req, which, obj_req->copyup_buf);
return 0;
}

@@ -2254,7 +2171,7 @@ static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,

osd_req_op_extent_init(osd_req, which, opcode,
obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
- rbd_osd_setup_data(osd_req, which);
+ osd_req_op_extent_osd_iter(osd_req, which, &obj_req->iter);
}

static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
@@ -2464,20 +2381,6 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
return 0;
}

-union rbd_img_fill_iter {
- struct ceph_bio_iter bio_iter;
- struct ceph_bvec_iter bvec_iter;
-};
-
-struct rbd_img_fill_ctx {
- enum obj_request_type pos_type;
- union rbd_img_fill_iter *pos;
- union rbd_img_fill_iter iter;
- ceph_object_extent_fn_t set_pos_fn;
- ceph_object_extent_fn_t count_fn;
- ceph_object_extent_fn_t copy_fn;
-};
-
static struct ceph_object_extent *alloc_object_extent(void *arg)
{
struct rbd_img_request *img_req = arg;
@@ -2491,6 +2394,19 @@ static struct ceph_object_extent *alloc_object_extent(void *arg)
return &obj_req->ex;
}

+static void set_iter_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
+{
+ struct rbd_obj_request *obj_req =
+ container_of(ex, struct rbd_obj_request, ex);
+ struct iov_iter *iter = arg;
+
+ dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
+ obj_req->iter = *iter;
+ iov_iter_truncate(&obj_req->iter, bytes);
+ obj_req->iter.nr_segs = iov_iter_npages(&obj_req->iter, INT_MAX);
+ iov_iter_advance(iter, bytes);
+}
+
/*
* While su != os && sc == 1 is technically not fancy (it's the same
* layout as su == os && sc == 1), we can't use the nocopy path for it
@@ -2506,25 +2422,22 @@ static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
struct ceph_file_extent *img_extents,
u32 num_img_extents,
- struct rbd_img_fill_ctx *fctx)
+ struct iov_iter *iter)
{
u32 i;
int ret;

- img_req->data_type = fctx->pos_type;
-
/*
* Create object requests and set each object request's starting
- * position in the provided bio (list) or bio_vec array.
+ * position in the provided iterator.
*/
- fctx->iter = *fctx->pos;
for (i = 0; i < num_img_extents; i++) {
ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
img_extents[i].fe_off,
img_extents[i].fe_len,
&img_req->object_extents,
alloc_object_extent, img_req,
- fctx->set_pos_fn, &fctx->iter);
+ set_iter_pos, iter);
if (ret)
return ret;
}
@@ -2537,30 +2450,27 @@ static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
* corresponding object requests (normally each to a different object,
* but not always) and add them to @img_req. For each object request,
* set up its data descriptor to point to the corresponding chunk(s) of
- * @fctx->pos data buffer.
+ * @iter data buffer.
*
* Because ceph_file_to_extents() will merge adjacent object extents
* together, each object request's data descriptor may point to multiple
- * different chunks of @fctx->pos data buffer.
+ * different chunks of @iter data buffer.
*
- * @fctx->pos data buffer is assumed to be large enough.
+ * @iter data buffer is assumed to be large enough.
*/
static int rbd_img_fill_request(struct rbd_img_request *img_req,
struct ceph_file_extent *img_extents,
u32 num_img_extents,
- struct rbd_img_fill_ctx *fctx)
+ struct iov_iter *iter)
{
struct rbd_device *rbd_dev = img_req->rbd_dev;
struct rbd_obj_request *obj_req;
- u32 i;
- int ret;

- if (fctx->pos_type == OBJ_REQUEST_NODATA ||
- !rbd_layout_is_fancy(&rbd_dev->layout))
+ if (!rbd_layout_is_fancy(&rbd_dev->layout))
return rbd_img_fill_request_nocopy(img_req, img_extents,
- num_img_extents, fctx);
+ num_img_extents, iter);

- img_req->data_type = OBJ_REQUEST_OWN_BVECS;
+ img_req->need_free_bvecs = true;

/*
* Create object requests and determine ->bvec_count for each object
@@ -2569,184 +2479,48 @@ static int rbd_img_fill_request(struct rbd_img_request *img_req,
* or bio_vec array because when mapped, those bio_vecs can straddle
* stripe unit boundaries.
*/
- fctx->iter = *fctx->pos;
- for (i = 0; i < num_img_extents; i++) {
- ret = ceph_file_to_extents(&rbd_dev->layout,
- img_extents[i].fe_off,
- img_extents[i].fe_len,
- &img_req->object_extents,
- alloc_object_extent, img_req,
- fctx->count_fn, &fctx->iter);
- if (ret)
- return ret;
- }
-
for_each_obj_request(img_req, obj_req) {
- obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
- sizeof(*obj_req->bvec_pos.bvecs),
- GFP_NOIO);
- if (!obj_req->bvec_pos.bvecs)
+ struct iov_iter iter = obj_req->iter;
+ obj_req->bvec = (struct bio_vec *)dup_iter(&obj_req->iter, &iter, GFP_NOIO);
+ if (!obj_req->bvec)
return -ENOMEM;
}

- /*
- * Fill in each object request's private bio_vec array, splitting and
- * rearranging the provided bio_vecs in stripe unit chunks as needed.
- */
- fctx->iter = *fctx->pos;
- for (i = 0; i < num_img_extents; i++) {
- ret = ceph_iterate_extents(&rbd_dev->layout,
- img_extents[i].fe_off,
- img_extents[i].fe_len,
- &img_req->object_extents,
- fctx->copy_fn, &fctx->iter);
- if (ret)
- return ret;
- }
-
return __rbd_img_fill_request(img_req);
}

+/*
+ * Handle ranged, but dataless ops such as DISCARD and ZEROOUT.
+ */
static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
u64 off, u64 len)
{
- struct ceph_file_extent ex = { off, len };
- union rbd_img_fill_iter dummy = {};
- struct rbd_img_fill_ctx fctx = {
- .pos_type = OBJ_REQUEST_NODATA,
- .pos = &dummy,
- };
-
- return rbd_img_fill_request(img_req, &ex, 1, &fctx);
-}
-
-static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
-{
- struct rbd_obj_request *obj_req =
- container_of(ex, struct rbd_obj_request, ex);
- struct ceph_bio_iter *it = arg;
-
- dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
- obj_req->bio_pos = *it;
- ceph_bio_iter_advance(it, bytes);
-}
-
-static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
-{
- struct rbd_obj_request *obj_req =
- container_of(ex, struct rbd_obj_request, ex);
- struct ceph_bio_iter *it = arg;
-
- dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
- ceph_bio_iter_advance_step(it, bytes, ({
- obj_req->bvec_count++;
- }));
-
-}
-
-static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
-{
- struct rbd_obj_request *obj_req =
- container_of(ex, struct rbd_obj_request, ex);
- struct ceph_bio_iter *it = arg;
+ int ret;

- dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
- ceph_bio_iter_advance_step(it, bytes, ({
- obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
- obj_req->bvec_pos.iter.bi_size += bv.bv_len;
- }));
-}
-
-static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
- struct ceph_file_extent *img_extents,
- u32 num_img_extents,
- struct ceph_bio_iter *bio_pos)
-{
- struct rbd_img_fill_ctx fctx = {
- .pos_type = OBJ_REQUEST_BIO,
- .pos = (union rbd_img_fill_iter *)bio_pos,
- .set_pos_fn = set_bio_pos,
- .count_fn = count_bio_bvecs,
- .copy_fn = copy_bio_bvecs,
- };
+ ret = ceph_file_to_extents(&img_req->rbd_dev->layout, off, len,
+ &img_req->object_extents,
+ alloc_object_extent, img_req,
+ NULL, NULL);
+ if (ret)
+ return ret;

- return rbd_img_fill_request(img_req, img_extents, num_img_extents,
- &fctx);
+ return __rbd_img_fill_request(img_req);
}

+/*
+ * Set up an iterator to access the data/buffer supplied through a bio.
+ */
static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
u64 off, u64 len, struct bio *bio)
{
struct ceph_file_extent ex = { off, len };
- struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
-
- return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
-}
-
-static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
-{
- struct rbd_obj_request *obj_req =
- container_of(ex, struct rbd_obj_request, ex);
- struct ceph_bvec_iter *it = arg;
-
- obj_req->bvec_pos = *it;
- ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
- ceph_bvec_iter_advance(it, bytes);
-}
-
-static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
-{
- struct rbd_obj_request *obj_req =
- container_of(ex, struct rbd_obj_request, ex);
- struct ceph_bvec_iter *it = arg;
+ struct iov_iter iter;

- ceph_bvec_iter_advance_step(it, bytes, ({
- obj_req->bvec_count++;
- }));
-}
-
-static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
-{
- struct rbd_obj_request *obj_req =
- container_of(ex, struct rbd_obj_request, ex);
- struct ceph_bvec_iter *it = arg;
+ iov_iter_bvec(&iter, READ, bio->bi_io_vec, bio->bi_vcnt,
+ bio->bi_iter.bi_size + bio->bi_iter.bi_bvec_done);
+ iov_iter_advance(&iter, bio->bi_iter.bi_bvec_done);

- ceph_bvec_iter_advance_step(it, bytes, ({
- obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
- obj_req->bvec_pos.iter.bi_size += bv.bv_len;
- }));
-}
-
-static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
- struct ceph_file_extent *img_extents,
- u32 num_img_extents,
- struct ceph_bvec_iter *bvec_pos)
-{
- struct rbd_img_fill_ctx fctx = {
- .pos_type = OBJ_REQUEST_BVECS,
- .pos = (union rbd_img_fill_iter *)bvec_pos,
- .set_pos_fn = set_bvec_pos,
- .count_fn = count_bvecs,
- .copy_fn = copy_bvecs,
- };
-
- return rbd_img_fill_request(img_req, img_extents, num_img_extents,
- &fctx);
-}
-
-static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
- struct ceph_file_extent *img_extents,
- u32 num_img_extents,
- struct bio_vec *bvecs)
-{
- struct ceph_bvec_iter it = {
- .bvecs = bvecs,
- .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
- num_img_extents) },
- };
-
- return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
- &it);
+ return rbd_img_fill_request(img_req, &ex, 1, &iter);
}

static void rbd_img_handle_request_work(struct work_struct *work)
@@ -2789,7 +2563,7 @@ static int rbd_obj_read_object(struct rbd_obj_request *obj_req)

osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
- rbd_osd_setup_data(osd_req, 0);
+ osd_req_op_extent_osd_iter(osd_req, 0, &obj_req->iter);
rbd_osd_format_read(osd_req);

ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
@@ -2823,28 +2597,15 @@ static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
obj_req);

if (!rbd_img_is_write(img_req)) {
- switch (img_req->data_type) {
- case OBJ_REQUEST_BIO:
- ret = __rbd_img_fill_from_bio(child_img_req,
- obj_req->img_extents,
- obj_req->num_img_extents,
- &obj_req->bio_pos);
- break;
- case OBJ_REQUEST_BVECS:
- case OBJ_REQUEST_OWN_BVECS:
- ret = __rbd_img_fill_from_bvecs(child_img_req,
- obj_req->img_extents,
- obj_req->num_img_extents,
- &obj_req->bvec_pos);
- break;
- default:
- BUG();
- }
+ ret = rbd_img_fill_request(child_img_req,
+ obj_req->img_extents,
+ obj_req->num_img_extents,
+ &obj_req->iter);
} else {
- ret = rbd_img_fill_from_bvecs(child_img_req,
- obj_req->img_extents,
- obj_req->num_img_extents,
- obj_req->copyup_bvecs);
+ ret = rbd_img_fill_request(img_req,
+ obj_req->img_extents,
+ obj_req->num_img_extents,
+ &obj_req->copyup_buf->iter);
}
if (ret) {
rbd_img_request_destroy(child_img_req);
@@ -3002,21 +2763,9 @@ static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
return 0;
}

-/*
- * copyup_bvecs pages are never highmem pages
- */
-static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
+static bool is_zero_bvecs(struct ceph_databuf *dbuf, size_t count)
{
- struct ceph_bvec_iter it = {
- .bvecs = bvecs,
- .iter = { .bi_size = bytes },
- };
-
- ceph_bvec_iter_advance_step(&it, bytes, ({
- if (memchr_inv(bvec_virt(&bv), 0, bv.bv_len))
- return false;
- }));
- return true;
+ return iov_iter_is_zero(&dbuf->iter, count);
}

#define MODS_ONLY U32_MAX
@@ -3082,30 +2831,18 @@ static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
return 0;
}

-static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
+static int setup_copyup_buf(struct rbd_obj_request *obj_req, u64 obj_overlap)
{
- u32 i;
-
- rbd_assert(!obj_req->copyup_bvecs);
- obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
- obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
- sizeof(*obj_req->copyup_bvecs),
- GFP_NOIO);
- if (!obj_req->copyup_bvecs)
- return -ENOMEM;
+ struct ceph_databuf *dbuf;

- for (i = 0; i < obj_req->copyup_bvec_count; i++) {
- unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
- struct page *page = alloc_page(GFP_NOIO);
+ rbd_assert(!obj_req->copyup_buf);

- if (!page)
- return -ENOMEM;
-
- bvec_set_page(&obj_req->copyup_bvecs[i], page, len, 0);
- obj_overlap -= len;
- }
+ dbuf = ceph_databuf_alloc(calc_pages_for(0, obj_overlap),
+ obj_overlap, GFP_NOIO);
+ if (!dbuf)
+ return -ENOMEM;

- rbd_assert(!obj_overlap);
+ obj_req->copyup_buf = dbuf;
return 0;
}

@@ -3132,7 +2869,7 @@ static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
}

- ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
+ ret = setup_copyup_buf(obj_req, rbd_obj_img_extents_bytes(obj_req));
if (ret)
return ret;

@@ -3239,7 +2976,7 @@ static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
if (*result)
return true;

- if (is_zero_bvecs(obj_req->copyup_bvecs,
+ if (is_zero_bvecs(obj_req->copyup_buf,
rbd_obj_img_extents_bytes(obj_req))) {
dout("%s %p detected zeros\n", __func__, obj_req);
obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 323e7631c7d8..5d16469a3690 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -81,11 +81,11 @@ static __le32 ceph_flags_sys2wire(struct ceph_mds_client *mdsc, u32 flags)
*/
#define ITER_GET_BVECS_PAGES 64

-static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
- struct bio_vec *bvecs)
+static int __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
+ struct ceph_databuf *dbuf)
{
+ struct bio_vec *bvecs = dbuf->bvec;
size_t size = 0;
- int bvec_idx = 0;

if (maxsize > iov_iter_count(iter))
maxsize = iov_iter_count(iter);
@@ -97,22 +97,25 @@ static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
int idx = 0;

bytes = iov_iter_get_pages2(iter, pages, maxsize - size,
- ITER_GET_BVECS_PAGES, &start);
- if (bytes < 0)
- return size ?: bytes;
+ ITER_GET_BVECS_PAGES, &start);
+ if (bytes < 0) {
+ if (size == 0)
+ return bytes;
+ break;
+ }

- size += bytes;
+ dbuf->length += bytes;

- for ( ; bytes; idx++, bvec_idx++) {
+ while (bytes) {
int len = min_t(int, bytes, PAGE_SIZE - start);

- bvec_set_page(&bvecs[bvec_idx], pages[idx], len, start);
+ bvec_set_page(&bvecs[dbuf->nr_bvec++], pages[idx++], len, start);
bytes -= len;
start = 0;
}
}

- return size;
+ return 0;
}

/*
@@ -123,52 +126,43 @@ static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
* Attempt to get up to @maxsize bytes worth of pages from @iter.
* Return the number of bytes in the created bio_vec array, or an error.
*/
-static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize,
- struct bio_vec **bvecs, int *num_bvecs)
+static struct ceph_databuf *iter_get_bvecs_alloc(struct iov_iter *iter,
+ size_t maxsize, bool write)
{
- struct bio_vec *bv;
+ struct ceph_databuf *dbuf;
size_t orig_count = iov_iter_count(iter);
- ssize_t bytes;
- int npages;
+ int npages, ret;

iov_iter_truncate(iter, maxsize);
npages = iov_iter_npages(iter, INT_MAX);
iov_iter_reexpand(iter, orig_count);

- /*
- * __iter_get_bvecs() may populate only part of the array -- zero it
- * out.
- */
- bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO);
- if (!bv)
- return -ENOMEM;
+ dbuf = ceph_databuf_alloc(npages, 0, GFP_KERNEL);
+ if (!dbuf)
+ return ERR_PTR(-ENOMEM);

- bytes = __iter_get_bvecs(iter, maxsize, bv);
- if (bytes < 0) {
+ ret = __iter_get_bvecs(iter, maxsize, dbuf);
+ if (ret < 0) {
/*
* No pages were pinned -- just free the array.
*/
- kvfree(bv);
- return bytes;
+ ceph_databuf_release(dbuf);
+ return ERR_PTR(ret);
}

- *bvecs = bv;
- *num_bvecs = npages;
- return bytes;
+ iov_iter_bvec(&dbuf->iter, write ? ITER_SOURCE : ITER_DEST,
+ dbuf->bvec, dbuf->nr_bvec, dbuf->length);
+ return dbuf;
}

-static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty)
+static void ceph_dirty_pages(struct ceph_databuf *dbuf)
{
+ struct bio_vec *bvec = dbuf->bvec;
int i;

- for (i = 0; i < num_bvecs; i++) {
- if (bvecs[i].bv_page) {
- if (should_dirty)
- set_page_dirty_lock(bvecs[i].bv_page);
- put_page(bvecs[i].bv_page);
- }
- }
- kvfree(bvecs);
+ for (i = 0; i < dbuf->nr_bvec; i++)
+ if (bvec[i].bv_page)
+ set_page_dirty_lock(bvec[i].bv_page);
}

/*
@@ -1262,14 +1256,11 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
struct ceph_osd_req_op *op = &req->r_ops[0];
struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric;
- unsigned int len = osd_data->bvec_pos.iter.bi_size;
- bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ);
struct ceph_client *cl = ceph_inode_to_client(inode);
+ size_t len = osd_data->iter.count;
+ bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ);

- BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
- BUG_ON(!osd_data->num_bvecs);
-
- doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %u\n", req,
+ doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %zu\n", req,
inode, ceph_vinop(inode), rc, len);

if (rc == -EOLDSNAPC) {
@@ -1291,7 +1282,6 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
if (rc == -ENOENT)
rc = 0;
if (rc >= 0 && len > rc) {
- struct iov_iter i;
int zlen = len - rc;

/*
@@ -1308,10 +1298,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
aio_req->total_len = rc + zlen;
}

- iov_iter_bvec(&i, ITER_DEST, osd_data->bvec_pos.bvecs,
- osd_data->num_bvecs, len);
- iov_iter_advance(&i, rc);
- iov_iter_zero(zlen, &i);
+ iov_iter_advance(&osd_data->iter, rc);
+ iov_iter_zero(zlen, &osd_data->iter);
}
}

@@ -1325,8 +1313,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
req->r_end_latency, len, rc);
}

- put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs,
- aio_req->should_dirty);
+ if (aio_req->should_dirty)
+ ceph_dirty_pages(osd_data->dbuf);
ceph_osdc_put_request(req);

if (rc < 0)
@@ -1415,9 +1403,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
struct ceph_client_metric *metric = &fsc->mdsc->metric;
struct ceph_vino vino;
struct ceph_osd_request *req;
- struct bio_vec *bvecs;
struct ceph_aio_request *aio_req = NULL;
- int num_pages = 0;
+ struct ceph_databuf *dbuf = NULL;
int flags;
int ret = 0;
struct timespec64 mtime = current_time(inode);
@@ -1453,8 +1440,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,

while (iov_iter_count(iter) > 0) {
u64 size = iov_iter_count(iter);
- ssize_t len;
struct ceph_osd_req_op *op;
+ size_t len;
int readop = sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ;

if (write)
@@ -1476,12 +1463,13 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
break;
}

- len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages);
- if (len < 0) {
+ dbuf = iter_get_bvecs_alloc(iter, size, write);
+ if (IS_ERR(dbuf)) {
ceph_osdc_put_request(req);
- ret = len;
+ ret = PTR_ERR(dbuf);
break;
}
+ len = dbuf->length;
if (len != size)
osd_req_op_extent_update(req, 0, len);

@@ -1516,7 +1504,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
req->r_mtime = mtime;
}

- osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len);
+ osd_req_op_extent_osd_databuf(req, 0, dbuf);
op = &req->r_ops[0];
if (sparse) {
ret = ceph_alloc_sparse_ext_map(op);
@@ -1558,20 +1546,17 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
ret = 0;

if (ret >= 0 && ret < len && pos + ret < size) {
- struct iov_iter i;
int zlen = min_t(size_t, len - ret,
size - pos - ret);

- iov_iter_bvec(&i, ITER_DEST, bvecs, num_pages, len);
- iov_iter_advance(&i, ret);
- iov_iter_zero(zlen, &i);
+ iov_iter_advance(&dbuf->iter, ret);
+ iov_iter_zero(zlen, &dbuf->iter);
ret += zlen;
}
if (ret >= 0)
len = ret;
}

- put_bvecs(bvecs, num_pages, should_dirty);
ceph_osdc_put_request(req);
if (ret < 0)
break;


2023-08-04 13:53:20

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 11/18] ceph: Remove bvec and bio data container types

The CEPH_MSG_DATA_BIO and CEPH_MSG_DATA_BVEC data types are now not used,
so remove them.

Signed-off-by: David Howells <[email protected]>
---
include/linux/ceph/messenger.h | 103 --------------------
include/linux/ceph/osd_client.h | 31 ------
net/ceph/messenger.c | 166 --------------------------------
net/ceph/osd_client.c | 94 ------------------
4 files changed, 394 deletions(-)

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 351d00e9632d..0f4cc6e39da0 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -120,108 +120,15 @@ enum ceph_msg_data_type {
CEPH_MSG_DATA_DATABUF, /* data source/destination is a data buffer */
CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */
CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */
-#ifdef CONFIG_BLOCK
- CEPH_MSG_DATA_BIO, /* data source/destination is a bio list */
-#endif /* CONFIG_BLOCK */
- CEPH_MSG_DATA_BVECS, /* data source/destination is a bio_vec array */
CEPH_MSG_DATA_ITER, /* data source/destination is an iov_iter */
};

-#ifdef CONFIG_BLOCK
-
-struct ceph_bio_iter {
- struct bio *bio;
- struct bvec_iter iter;
-};
-
-#define __ceph_bio_iter_advance_step(it, n, STEP) do { \
- unsigned int __n = (n), __cur_n; \
- \
- while (__n) { \
- BUG_ON(!(it)->iter.bi_size); \
- __cur_n = min((it)->iter.bi_size, __n); \
- (void)(STEP); \
- bio_advance_iter((it)->bio, &(it)->iter, __cur_n); \
- if (!(it)->iter.bi_size && (it)->bio->bi_next) { \
- dout("__ceph_bio_iter_advance_step next bio\n"); \
- (it)->bio = (it)->bio->bi_next; \
- (it)->iter = (it)->bio->bi_iter; \
- } \
- __n -= __cur_n; \
- } \
-} while (0)
-
-/*
- * Advance @it by @n bytes.
- */
-#define ceph_bio_iter_advance(it, n) \
- __ceph_bio_iter_advance_step(it, n, 0)
-
-/*
- * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec.
- */
-#define ceph_bio_iter_advance_step(it, n, BVEC_STEP) \
- __ceph_bio_iter_advance_step(it, n, ({ \
- struct bio_vec bv; \
- struct bvec_iter __cur_iter; \
- \
- __cur_iter = (it)->iter; \
- __cur_iter.bi_size = __cur_n; \
- __bio_for_each_segment(bv, (it)->bio, __cur_iter, __cur_iter) \
- (void)(BVEC_STEP); \
- }))
-
-#endif /* CONFIG_BLOCK */
-
-struct ceph_bvec_iter {
- struct bio_vec *bvecs;
- struct bvec_iter iter;
-};
-
-#define __ceph_bvec_iter_advance_step(it, n, STEP) do { \
- BUG_ON((n) > (it)->iter.bi_size); \
- (void)(STEP); \
- bvec_iter_advance((it)->bvecs, &(it)->iter, (n)); \
-} while (0)
-
-/*
- * Advance @it by @n bytes.
- */
-#define ceph_bvec_iter_advance(it, n) \
- __ceph_bvec_iter_advance_step(it, n, 0)
-
-/*
- * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec.
- */
-#define ceph_bvec_iter_advance_step(it, n, BVEC_STEP) \
- __ceph_bvec_iter_advance_step(it, n, ({ \
- struct bio_vec bv; \
- struct bvec_iter __cur_iter; \
- \
- __cur_iter = (it)->iter; \
- __cur_iter.bi_size = (n); \
- for_each_bvec(bv, (it)->bvecs, __cur_iter, __cur_iter) \
- (void)(BVEC_STEP); \
- }))
-
-#define ceph_bvec_iter_shorten(it, n) do { \
- BUG_ON((n) > (it)->iter.bi_size); \
- (it)->iter.bi_size = (n); \
-} while (0)
-
struct ceph_msg_data {
enum ceph_msg_data_type type;
struct iov_iter iter;
bool release_dbuf;
union {
struct ceph_databuf *dbuf;
-#ifdef CONFIG_BLOCK
- struct {
- struct ceph_bio_iter bio_pos;
- u32 bio_length;
- };
-#endif /* CONFIG_BLOCK */
- struct ceph_bvec_iter bvec_pos;
struct {
struct page **pages;
size_t length; /* total # bytes */
@@ -240,10 +147,6 @@ struct ceph_msg_data_cursor {
int sr_resid; /* residual sparse_read len */
bool need_crc; /* crc update needed */
union {
-#ifdef CONFIG_BLOCK
- struct ceph_bio_iter bio_iter;
-#endif /* CONFIG_BLOCK */
- struct bvec_iter bvec_iter;
struct { /* pages */
unsigned int page_offset; /* offset in page */
unsigned short page_index; /* index in array */
@@ -609,12 +512,6 @@ void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t offset, bool own_pages);
extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
struct ceph_pagelist *pagelist);
-#ifdef CONFIG_BLOCK
-void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
- u32 length);
-#endif /* CONFIG_BLOCK */
-void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
- struct ceph_bvec_iter *bvec_pos);
void ceph_msg_data_add_iter(struct ceph_msg *msg,
struct iov_iter *iter);

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 780bd49d2734..fd91c5d92600 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -106,10 +106,6 @@ enum ceph_osd_data_type {
CEPH_OSD_DATA_TYPE_DATABUF,
CEPH_OSD_DATA_TYPE_PAGES,
CEPH_OSD_DATA_TYPE_PAGELIST,
-#ifdef CONFIG_BLOCK
- CEPH_OSD_DATA_TYPE_BIO,
-#endif /* CONFIG_BLOCK */
- CEPH_OSD_DATA_TYPE_BVECS,
CEPH_OSD_DATA_TYPE_ITER,
};

@@ -125,16 +121,6 @@ struct ceph_osd_data {
bool own_pages;
};
struct ceph_pagelist *pagelist;
-#ifdef CONFIG_BLOCK
- struct {
- struct ceph_bio_iter bio_pos;
- u32 bio_length;
- };
-#endif /* CONFIG_BLOCK */
- struct {
- struct ceph_bvec_iter bvec_pos;
- u32 num_bvecs;
- };
struct iov_iter iter;
};
};
@@ -500,19 +486,6 @@ extern void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *,
extern void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *,
unsigned int which,
struct ceph_pagelist *pagelist);
-#ifdef CONFIG_BLOCK
-void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
- unsigned int which,
- struct ceph_bio_iter *bio_pos,
- u32 bio_length);
-#endif /* CONFIG_BLOCK */
-void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req,
- unsigned int which,
- struct bio_vec *bvecs, u32 num_bvecs,
- u32 bytes);
-void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
- unsigned int which,
- struct ceph_bvec_iter *bvec_pos);
void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req,
unsigned int which, struct iov_iter *iter);

@@ -522,10 +495,6 @@ void osd_req_op_cls_request_databuf(struct ceph_osd_request *req,
extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
unsigned int which,
struct ceph_pagelist *pagelist);
-void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
- unsigned int which,
- struct bio_vec *bvecs, u32 num_bvecs,
- u32 bytes);
void osd_req_op_cls_response_databuf(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_databuf *dbuf);
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 4c8899c26e1e..1ef3576c930d 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -12,9 +12,6 @@
#include <linux/slab.h>
#include <linux/socket.h>
#include <linux/string.h>
-#ifdef CONFIG_BLOCK
-#include <linux/bio.h>
-#endif /* CONFIG_BLOCK */
#include <linux/dns_resolver.h>
#include <net/tcp.h>
#include <trace/events/sock.h>
@@ -714,116 +711,6 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
}
}

-#ifdef CONFIG_BLOCK
-
-/*
- * For a bio data item, a piece is whatever remains of the next
- * entry in the current bio iovec, or the first entry in the next
- * bio in the list.
- */
-static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
- size_t length)
-{
- struct ceph_msg_data *data = cursor->data;
- struct ceph_bio_iter *it = &cursor->bio_iter;
-
- cursor->resid = min_t(size_t, length, data->bio_length);
- *it = data->bio_pos;
- if (cursor->resid < it->iter.bi_size)
- it->iter.bi_size = cursor->resid;
-
- BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
-}
-
-static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
- size_t *page_offset,
- size_t *length)
-{
- struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio,
- cursor->bio_iter.iter);
-
- *page_offset = bv.bv_offset;
- *length = bv.bv_len;
- return bv.bv_page;
-}
-
-static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
- size_t bytes)
-{
- struct ceph_bio_iter *it = &cursor->bio_iter;
- struct page *page = bio_iter_page(it->bio, it->iter);
-
- BUG_ON(bytes > cursor->resid);
- BUG_ON(bytes > bio_iter_len(it->bio, it->iter));
- cursor->resid -= bytes;
- bio_advance_iter(it->bio, &it->iter, bytes);
-
- if (!cursor->resid)
- return false; /* no more data */
-
- if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done &&
- page == bio_iter_page(it->bio, it->iter)))
- return false; /* more bytes to process in this segment */
-
- if (!it->iter.bi_size) {
- it->bio = it->bio->bi_next;
- it->iter = it->bio->bi_iter;
- if (cursor->resid < it->iter.bi_size)
- it->iter.bi_size = cursor->resid;
- }
-
- BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
- return true;
-}
-#endif /* CONFIG_BLOCK */
-
-static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
- size_t length)
-{
- struct ceph_msg_data *data = cursor->data;
- struct bio_vec *bvecs = data->bvec_pos.bvecs;
-
- cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size);
- cursor->bvec_iter = data->bvec_pos.iter;
- cursor->bvec_iter.bi_size = cursor->resid;
-
- BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
-}
-
-static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
- size_t *page_offset,
- size_t *length)
-{
- struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs,
- cursor->bvec_iter);
-
- *page_offset = bv.bv_offset;
- *length = bv.bv_len;
- return bv.bv_page;
-}
-
-static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
- size_t bytes)
-{
- struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs;
- struct page *page = bvec_iter_page(bvecs, cursor->bvec_iter);
-
- BUG_ON(bytes > cursor->resid);
- BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter));
- cursor->resid -= bytes;
- bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes);
-
- if (!cursor->resid)
- return false; /* no more data */
-
- if (!bytes || (cursor->bvec_iter.bi_bvec_done &&
- page == bvec_iter_page(bvecs, cursor->bvec_iter)))
- return false; /* more bytes to process in this segment */
-
- BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
- return true;
-}
-
/*
* For a page array, a piece comes from the first page in the array
* that has not already been fully consumed.
@@ -1045,14 +932,6 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
case CEPH_MSG_DATA_PAGES:
ceph_msg_data_pages_cursor_init(cursor, length);
break;
-#ifdef CONFIG_BLOCK
- case CEPH_MSG_DATA_BIO:
- ceph_msg_data_bio_cursor_init(cursor, length);
- break;
-#endif /* CONFIG_BLOCK */
- case CEPH_MSG_DATA_BVECS:
- ceph_msg_data_bvecs_cursor_init(cursor, length);
- break;
case CEPH_MSG_DATA_ITER:
ceph_msg_data_iter_cursor_init(cursor, length);
break;
@@ -1095,14 +974,6 @@ struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
case CEPH_MSG_DATA_PAGES:
page = ceph_msg_data_pages_next(cursor, page_offset, length);
break;
-#ifdef CONFIG_BLOCK
- case CEPH_MSG_DATA_BIO:
- page = ceph_msg_data_bio_next(cursor, page_offset, length);
- break;
-#endif /* CONFIG_BLOCK */
- case CEPH_MSG_DATA_BVECS:
- page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
- break;
case CEPH_MSG_DATA_ITER:
page = ceph_msg_data_iter_next(cursor, page_offset, length);
break;
@@ -1136,14 +1007,6 @@ void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
case CEPH_MSG_DATA_PAGES:
new_piece = ceph_msg_data_pages_advance(cursor, bytes);
break;
-#ifdef CONFIG_BLOCK
- case CEPH_MSG_DATA_BIO:
- new_piece = ceph_msg_data_bio_advance(cursor, bytes);
- break;
-#endif /* CONFIG_BLOCK */
- case CEPH_MSG_DATA_BVECS:
- new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
- break;
case CEPH_MSG_DATA_ITER:
new_piece = ceph_msg_data_iter_advance(cursor, bytes);
break;
@@ -1936,35 +1799,6 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
}
EXPORT_SYMBOL(ceph_msg_data_add_pagelist);

-#ifdef CONFIG_BLOCK
-void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
- u32 length)
-{
- struct ceph_msg_data *data;
-
- data = ceph_msg_data_add(msg);
- data->type = CEPH_MSG_DATA_BIO;
- data->bio_pos = *bio_pos;
- data->bio_length = length;
-
- msg->data_length += length;
-}
-EXPORT_SYMBOL(ceph_msg_data_add_bio);
-#endif /* CONFIG_BLOCK */
-
-void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
- struct ceph_bvec_iter *bvec_pos)
-{
- struct ceph_msg_data *data;
-
- data = ceph_msg_data_add(msg);
- data->type = CEPH_MSG_DATA_BVECS;
- data->bvec_pos = *bvec_pos;
-
- msg->data_length += bvec_pos->iter.bi_size;
-}
-EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
-
void ceph_msg_data_add_iter(struct ceph_msg *msg,
struct iov_iter *iter)
{
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 02c35785ec28..6bbd9fe780c3 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -9,9 +9,6 @@
#include <linux/pagemap.h>
#include <linux/slab.h>
#include <linux/uaccess.h>
-#ifdef CONFIG_BLOCK
-#include <linux/bio.h>
-#endif

#include <linux/ceph/ceph_features.h>
#include <linux/ceph/libceph.h>
@@ -151,26 +148,6 @@ static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
osd_data->pagelist = pagelist;
}

-#ifdef CONFIG_BLOCK
-static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
- struct ceph_bio_iter *bio_pos,
- u32 bio_length)
-{
- osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
- osd_data->bio_pos = *bio_pos;
- osd_data->bio_length = bio_length;
-}
-#endif /* CONFIG_BLOCK */
-
-static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
- struct ceph_bvec_iter *bvec_pos,
- u32 num_bvecs)
-{
- osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
- osd_data->bvec_pos = *bvec_pos;
- osd_data->num_bvecs = num_bvecs;
-}
-
static void ceph_osd_iter_init(struct ceph_osd_data *osd_data,
struct iov_iter *iter)
{
@@ -251,47 +228,6 @@ void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);

-#ifdef CONFIG_BLOCK
-void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
- unsigned int which,
- struct ceph_bio_iter *bio_pos,
- u32 bio_length)
-{
- struct ceph_osd_data *osd_data;
-
- osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
- ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
-}
-EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
-#endif /* CONFIG_BLOCK */
-
-void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req,
- unsigned int which,
- struct bio_vec *bvecs, u32 num_bvecs,
- u32 bytes)
-{
- struct ceph_osd_data *osd_data;
- struct ceph_bvec_iter it = {
- .bvecs = bvecs,
- .iter = { .bi_size = bytes },
- };
-
- osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
- ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
-}
-EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvecs);
-
-void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
- unsigned int which,
- struct ceph_bvec_iter *bvec_pos)
-{
- struct ceph_osd_data *osd_data;
-
- osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
- ceph_osd_data_bvecs_init(osd_data, bvec_pos, 0);
-}
-EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
-
/**
* osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer
* @osd_req: The request to set up
@@ -357,24 +293,6 @@ static void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
osd_req->r_ops[which].indata_len += length;
}

-void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
- unsigned int which,
- struct bio_vec *bvecs, u32 num_bvecs,
- u32 bytes)
-{
- struct ceph_osd_data *osd_data;
- struct ceph_bvec_iter it = {
- .bvecs = bvecs,
- .iter = { .bi_size = bytes },
- };
-
- osd_data = osd_req_op_data(osd_req, which, cls, request_data);
- ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
- osd_req->r_ops[which].cls.indata_len += bytes;
- osd_req->r_ops[which].indata_len += bytes;
-}
-EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
-
void osd_req_op_cls_response_databuf(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_databuf *dbuf)
@@ -395,12 +313,6 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
return osd_data->length;
case CEPH_OSD_DATA_TYPE_PAGELIST:
return (u64)osd_data->pagelist->length;
-#ifdef CONFIG_BLOCK
- case CEPH_OSD_DATA_TYPE_BIO:
- return (u64)osd_data->bio_length;
-#endif /* CONFIG_BLOCK */
- case CEPH_OSD_DATA_TYPE_BVECS:
- return osd_data->bvec_pos.iter.bi_size;
case CEPH_OSD_DATA_TYPE_ITER:
return iov_iter_count(&osd_data->iter);
default:
@@ -1005,12 +917,6 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
BUG_ON(!length);
ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
-#ifdef CONFIG_BLOCK
- } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
- ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
-#endif
- } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
- ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) {
ceph_msg_data_add_iter(msg, &osd_data->iter);
} else {


2023-08-04 13:54:51

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 09/18] ceph: Convert notify_id_pages to a ceph_databuf

Convert linger->notify_id_pages to a ceph_databuf

Signed-off-by: David Howells <[email protected]>
---
include/linux/ceph/osd_client.h | 2 +-
net/ceph/osd_client.c | 18 +++++++-----------
2 files changed, 8 insertions(+), 12 deletions(-)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 0b02e272acc2..780bd49d2734 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -348,7 +348,7 @@ struct ceph_osd_linger_request {
void *data;

struct ceph_pagelist *request_pl;
- struct page **notify_id_pages;
+ struct ceph_databuf *notify_id_buf;

struct page ***preply_pages;
size_t *preply_len;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index aa9d07221149..02c35785ec28 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -2825,11 +2825,8 @@ static void linger_release(struct kref *kref)
WARN_ON(!list_empty(&lreq->pending_lworks));
WARN_ON(lreq->osd);

- if (lreq->request_pl)
- ceph_pagelist_release(lreq->request_pl);
- if (lreq->notify_id_pages)
- ceph_release_page_vector(lreq->notify_id_pages, 1);
-
+ ceph_pagelist_release(lreq->request_pl);
+ ceph_databuf_release(lreq->notify_id_buf);
ceph_osdc_put_request(lreq->reg_req);
ceph_osdc_put_request(lreq->ping_req);
target_destroy(&lreq->t);
@@ -3210,9 +3207,9 @@ static void send_linger(struct ceph_osd_linger_request *lreq)
refcount_inc(&lreq->request_pl->refcnt);
osd_req_op_notify_init(req, 0, lreq->linger_id,
lreq->request_pl);
- ceph_osd_data_pages_init(
+ ceph_osd_databuf_init(
osd_req_op_data(req, 0, notify, response_data),
- lreq->notify_id_pages, PAGE_SIZE, 0, false, false);
+ lreq->notify_id_buf);
}
dout("lreq %p register\n", lreq);
req->r_callback = linger_commit_cb;
@@ -4995,10 +4992,9 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
}

/* for notify_id */
- lreq->notify_id_pages = ceph_alloc_page_vector(1, GFP_NOIO);
- if (IS_ERR(lreq->notify_id_pages)) {
- ret = PTR_ERR(lreq->notify_id_pages);
- lreq->notify_id_pages = NULL;
+ lreq->notify_id_buf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOIO);
+ if (!lreq->notify_id_buf) {
+ ret = -ENOMEM;
goto out_put_lreq;
}



2023-08-04 13:59:05

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 07/18] ceph: Unexport osd_req_op_cls_request_data_pages()

Unexport osd_req_op_cls_request_data_pages() as it's not used outside of
the file in which it is defined and it will be replaced.

Signed-off-by: David Howells <[email protected]>
---
include/linux/ceph/osd_client.h | 5 -----
net/ceph/osd_client.c | 3 +--
2 files changed, 1 insertion(+), 7 deletions(-)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 0e008837dac1..e1533f3314ad 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -522,11 +522,6 @@ void osd_req_op_cls_request_databuf(struct ceph_osd_request *req,
extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
unsigned int which,
struct ceph_pagelist *pagelist);
-extern void osd_req_op_cls_request_data_pages(struct ceph_osd_request *,
- unsigned int which,
- struct page **pages, u64 length,
- u32 offset, bool pages_from_pool,
- bool own_pages);
void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
unsigned int which,
struct bio_vec *bvecs, u32 num_bvecs,
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 7ce3aef55755..2ba6f2ce5fb6 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -344,7 +344,7 @@ void osd_req_op_cls_request_data_pagelist(
}
EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);

-void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
+static void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
unsigned int which, struct page **pages, u64 length,
u32 offset, bool pages_from_pool, bool own_pages)
{
@@ -356,7 +356,6 @@ void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
osd_req->r_ops[which].cls.indata_len += length;
osd_req->r_ops[which].indata_len += length;
}
-EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);

void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
unsigned int which,


2023-08-04 14:04:33

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 13/18] ceph: Convert users of ceph_pagelist to ceph_databuf

Convert users of ceph_pagelist to use ceph_databuf instead. ceph_pagelist
is then unused and can be removed.

Signed-off-by: David Howells <[email protected]>
---
fs/ceph/locks.c | 22 +++---
fs/ceph/mds_client.c | 122 +++++++++++++++-----------------
fs/ceph/super.h | 6 +-
include/linux/ceph/osd_client.h | 2 +-
net/ceph/osd_client.c | 90 ++++++++++++-----------
5 files changed, 124 insertions(+), 118 deletions(-)

diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index e07ad29ff8b9..b3c018a8a92f 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -370,8 +370,8 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
}

/*
- * Fills in the passed counter variables, so you can prepare pagelist metadata
- * before calling ceph_encode_locks.
+ * Fills in the passed counter variables, so you can prepare metadata before
+ * calling ceph_encode_locks.
*/
void ceph_count_locks(struct inode *inode, int *fcntl_count, int *flock_count)
{
@@ -481,38 +481,38 @@ int ceph_encode_locks_to_buffer(struct inode *inode,
}

/*
- * Copy the encoded flock and fcntl locks into the pagelist.
+ * Copy the encoded flock and fcntl locks into the data buffer.
* Format is: #fcntl locks, sequential fcntl locks, #flock locks,
* sequential flock locks.
* Returns zero on success.
*/
-int ceph_locks_to_pagelist(struct ceph_filelock *flocks,
- struct ceph_pagelist *pagelist,
+int ceph_locks_to_databuf(struct ceph_filelock *flocks,
+ struct ceph_databuf *dbuf,
int num_fcntl_locks, int num_flock_locks)
{
int err = 0;
__le32 nlocks;

nlocks = cpu_to_le32(num_fcntl_locks);
- err = ceph_pagelist_append(pagelist, &nlocks, sizeof(nlocks));
+ err = ceph_databuf_append(dbuf, &nlocks, sizeof(nlocks));
if (err)
goto out_fail;

if (num_fcntl_locks > 0) {
- err = ceph_pagelist_append(pagelist, flocks,
- num_fcntl_locks * sizeof(*flocks));
+ err = ceph_databuf_append(dbuf, flocks,
+ num_fcntl_locks * sizeof(*flocks));
if (err)
goto out_fail;
}

nlocks = cpu_to_le32(num_flock_locks);
- err = ceph_pagelist_append(pagelist, &nlocks, sizeof(nlocks));
+ err = ceph_databuf_append(dbuf, &nlocks, sizeof(nlocks));
if (err)
goto out_fail;

if (num_flock_locks > 0) {
- err = ceph_pagelist_append(pagelist, &flocks[num_fcntl_locks],
- num_flock_locks * sizeof(*flocks));
+ err = ceph_databuf_append(dbuf, &flocks[num_fcntl_locks],
+ num_flock_locks * sizeof(*flocks));
}
out_fail:
return err;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 85b2f1eccf88..9f5c4f47982e 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -55,7 +55,7 @@
struct ceph_reconnect_state {
struct ceph_mds_session *session;
int nr_caps, nr_realms;
- struct ceph_pagelist *pagelist;
+ struct ceph_databuf *dbuf;
unsigned msg_version;
bool allow_multi;
};
@@ -4244,8 +4244,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
{
struct ceph_msg *reply;
- struct ceph_pagelist *_pagelist;
- struct page *page;
+ struct ceph_databuf *_dbuf;
__le32 *addr;
int err = -ENOMEM;

@@ -4255,9 +4254,9 @@ static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
/* can't handle message that contains both caps and realm */
BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);

- /* pre-allocate new pagelist */
- _pagelist = ceph_pagelist_alloc(GFP_NOFS);
- if (!_pagelist)
+ /* pre-allocate new databuf */
+ _dbuf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOFS);
+ if (!_dbuf)
return -ENOMEM;

reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
@@ -4265,28 +4264,27 @@ static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
goto fail_msg;

/* placeholder for nr_caps */
- err = ceph_pagelist_encode_32(_pagelist, 0);
+ err = ceph_databuf_encode_32(_dbuf, 0);
if (err < 0)
goto fail;

if (recon_state->nr_caps) {
/* currently encoding caps */
- err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
+ err = ceph_databuf_encode_32(recon_state->dbuf, 0);
if (err)
goto fail;
} else {
/* placeholder for nr_realms (currently encoding relams) */
- err = ceph_pagelist_encode_32(_pagelist, 0);
+ err = ceph_databuf_encode_32(_dbuf, 0);
if (err < 0)
goto fail;
}

- err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
+ err = ceph_databuf_encode_8(recon_state->dbuf, 1);
if (err)
goto fail;

- page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
- addr = kmap_atomic(page);
+ addr = kmap_ceph_databuf_page(recon_state->dbuf, 0);
if (recon_state->nr_caps) {
/* currently encoding caps */
*addr = cpu_to_le32(recon_state->nr_caps);
@@ -4294,18 +4292,18 @@ static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
/* currently encoding relams */
*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
}
- kunmap_atomic(addr);
+ kunmap_local(addr);

reply->hdr.version = cpu_to_le16(5);
reply->hdr.compat_version = cpu_to_le16(4);

- reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
- ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
+ reply->hdr.data_len = cpu_to_le32(recon_state->dbuf->length);
+ ceph_msg_data_add_databuf(reply, recon_state->dbuf);

ceph_con_send(&recon_state->session->s_con, reply);
- ceph_pagelist_release(recon_state->pagelist);
+ ceph_databuf_release(recon_state->dbuf);

- recon_state->pagelist = _pagelist;
+ recon_state->dbuf = _dbuf;
recon_state->nr_caps = 0;
recon_state->nr_realms = 0;
recon_state->msg_version = 5;
@@ -4313,7 +4311,7 @@ static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
fail:
ceph_msg_put(reply);
fail_msg:
- ceph_pagelist_release(_pagelist);
+ ceph_databuf_release(_dbuf);
return err;
}

@@ -4363,7 +4361,7 @@ static int reconnect_caps_cb(struct inode *inode, int mds, void *arg)
} rec;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_reconnect_state *recon_state = arg;
- struct ceph_pagelist *pagelist = recon_state->pagelist;
+ struct ceph_databuf *dbuf = recon_state->dbuf;
struct dentry *dentry;
struct ceph_cap *cap;
char *path;
@@ -4482,7 +4480,7 @@ static int reconnect_caps_cb(struct inode *inode, int mds, void *arg)
struct_v = 2;
}
/*
- * number of encoded locks is stable, so copy to pagelist
+ * number of encoded locks is stable, so copy to databuf
*/
struct_len = 2 * sizeof(u32) +
(num_fcntl_locks + num_flock_locks) *
@@ -4496,41 +4494,42 @@ static int reconnect_caps_cb(struct inode *inode, int mds, void *arg)

total_len += struct_len;

- if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
+ if (dbuf->length + total_len > RECONNECT_MAX_SIZE) {
err = send_reconnect_partial(recon_state);
if (err)
goto out_freeflocks;
- pagelist = recon_state->pagelist;
+ dbuf = recon_state->dbuf;
}

- err = ceph_pagelist_reserve(pagelist, total_len);
+ err = ceph_databuf_reserve(dbuf, total_len, GFP_NOFS);
if (err)
goto out_freeflocks;

- ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
+ ceph_databuf_encode_64(dbuf, ceph_ino(inode));
if (recon_state->msg_version >= 3) {
- ceph_pagelist_encode_8(pagelist, struct_v);
- ceph_pagelist_encode_8(pagelist, 1);
- ceph_pagelist_encode_32(pagelist, struct_len);
+ ceph_databuf_encode_8(dbuf, struct_v);
+ ceph_databuf_encode_8(dbuf, 1);
+ ceph_databuf_encode_32(dbuf, struct_len);
}
- ceph_pagelist_encode_string(pagelist, path, pathlen);
- ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
- ceph_locks_to_pagelist(flocks, pagelist,
- num_fcntl_locks, num_flock_locks);
+ ceph_databuf_encode_string(dbuf, path, pathlen);
+ ceph_databuf_append(dbuf, &rec, sizeof(rec.v2));
+ ceph_locks_to_databuf(flocks, dbuf,
+ num_fcntl_locks, num_flock_locks);
if (struct_v >= 2)
- ceph_pagelist_encode_64(pagelist, snap_follows);
+ ceph_databuf_encode_64(dbuf, snap_follows);
out_freeflocks:
kfree(flocks);
} else {
- err = ceph_pagelist_reserve(pagelist,
- sizeof(u64) + sizeof(u32) +
- pathlen + sizeof(rec.v1));
+ err = ceph_databuf_reserve(dbuf,
+ sizeof(u64) + sizeof(u32) +
+ pathlen + sizeof(rec.v1),
+ GFP_NOFS);
if (err)
goto out_err;

- ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
- ceph_pagelist_encode_string(pagelist, path, pathlen);
- ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
+ ceph_databuf_encode_64(dbuf, ceph_ino(inode));
+ ceph_databuf_encode_string(dbuf, path, pathlen);
+ ceph_databuf_append(dbuf, &rec, sizeof(rec.v1));
}

out_err:
@@ -4544,12 +4543,12 @@ static int encode_snap_realms(struct ceph_mds_client *mdsc,
struct ceph_reconnect_state *recon_state)
{
struct rb_node *p;
- struct ceph_pagelist *pagelist = recon_state->pagelist;
+ struct ceph_databuf *dbuf = recon_state->dbuf;
struct ceph_client *cl = mdsc->fsc->client;
int err = 0;

if (recon_state->msg_version >= 4) {
- err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
+ err = ceph_databuf_encode_32(dbuf, mdsc->num_snap_realms);
if (err < 0)
goto fail;
}
@@ -4568,20 +4567,20 @@ static int encode_snap_realms(struct ceph_mds_client *mdsc,
size_t need = sizeof(u8) * 2 + sizeof(u32) +
sizeof(sr_rec);

- if (pagelist->length + need > RECONNECT_MAX_SIZE) {
+ if (dbuf->length + need > RECONNECT_MAX_SIZE) {
err = send_reconnect_partial(recon_state);
if (err)
goto fail;
- pagelist = recon_state->pagelist;
+ dbuf = recon_state->dbuf;
}

- err = ceph_pagelist_reserve(pagelist, need);
+ err = ceph_databuf_reserve(dbuf, need, GFP_NOFS);
if (err)
goto fail;

- ceph_pagelist_encode_8(pagelist, 1);
- ceph_pagelist_encode_8(pagelist, 1);
- ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
+ ceph_databuf_encode_8(dbuf, 1);
+ ceph_databuf_encode_8(dbuf, 1);
+ ceph_databuf_encode_32(dbuf, sizeof(sr_rec));
}

doutc(cl, " adding snap realm %llx seq %lld parent %llx\n",
@@ -4590,7 +4589,7 @@ static int encode_snap_realms(struct ceph_mds_client *mdsc,
sr_rec.seq = cpu_to_le64(realm->seq);
sr_rec.parent = cpu_to_le64(realm->parent_ino);

- err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
+ err = ceph_databuf_append(dbuf, &sr_rec, sizeof(sr_rec));
if (err)
goto fail;

@@ -4625,9 +4624,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,

pr_info_client(cl, "mds%d reconnect start\n", mds);

- recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
- if (!recon_state.pagelist)
- goto fail_nopagelist;
+ recon_state.dbuf = ceph_databuf_alloc(1, 0, GFP_NOFS);
+ if (!recon_state.dbuf)
+ goto fail_nodatabuf;

reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
if (!reply)
@@ -4675,7 +4674,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
down_read(&mdsc->snap_rwsem);

/* placeholder for nr_caps */
- err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
+ err = ceph_databuf_encode_32(recon_state.dbuf, 0);
if (err)
goto fail;

@@ -4700,7 +4699,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
/* check if all realms can be encoded into current message */
if (mdsc->num_snap_realms) {
size_t total_len =
- recon_state.pagelist->length +
+ recon_state.dbuf->length +
mdsc->num_snap_realms *
sizeof(struct ceph_mds_snaprealm_reconnect);
if (recon_state.msg_version >= 4) {
@@ -4729,31 +4728,28 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
goto fail;

if (recon_state.msg_version >= 5) {
- err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
+ err = ceph_databuf_encode_8(recon_state.dbuf, 0);
if (err < 0)
goto fail;
}

if (recon_state.nr_caps || recon_state.nr_realms) {
- struct page *page =
- list_first_entry(&recon_state.pagelist->head,
- struct page, lru);
- __le32 *addr = kmap_atomic(page);
+ __le32 *addr = kmap_ceph_databuf_page(recon_state.dbuf, 0);
if (recon_state.nr_caps) {
WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
*addr = cpu_to_le32(recon_state.nr_caps);
} else if (recon_state.msg_version >= 4) {
*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
}
- kunmap_atomic(addr);
+ kunmap_local(addr);
}

reply->hdr.version = cpu_to_le16(recon_state.msg_version);
if (recon_state.msg_version >= 4)
reply->hdr.compat_version = cpu_to_le16(4);

- reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
- ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
+ reply->hdr.data_len = cpu_to_le32(recon_state.dbuf->length);
+ ceph_msg_data_add_databuf(reply, recon_state.dbuf);

ceph_con_send(&session->s_con, reply);

@@ -4764,7 +4760,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
mutex_unlock(&mdsc->mutex);

up_read(&mdsc->snap_rwsem);
- ceph_pagelist_release(recon_state.pagelist);
+ ceph_databuf_release(recon_state.dbuf);
return;

fail:
@@ -4772,8 +4768,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
up_read(&mdsc->snap_rwsem);
mutex_unlock(&session->s_mutex);
fail_nomsg:
- ceph_pagelist_release(recon_state.pagelist);
-fail_nopagelist:
+ ceph_databuf_release(recon_state.dbuf);
+fail_nodatabuf:
pr_err_client(cl, "error %d preparing reconnect for mds%d\n",
err, mds);
return;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 681e634052b1..169d88725209 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -1358,9 +1358,9 @@ extern int ceph_encode_locks_to_buffer(struct inode *inode,
struct ceph_filelock *flocks,
int num_fcntl_locks,
int num_flock_locks);
-extern int ceph_locks_to_pagelist(struct ceph_filelock *flocks,
- struct ceph_pagelist *pagelist,
- int num_fcntl_locks, int num_flock_locks);
+extern int ceph_locks_to_databuf(struct ceph_filelock *flocks,
+ struct ceph_databuf *dbuf,
+ int num_fcntl_locks, int num_flock_locks);

/* debugfs.c */
extern void ceph_fs_debugfs_init(struct ceph_fs_client *client);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index fec78550d5ce..82c1c325861d 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -333,7 +333,7 @@ struct ceph_osd_linger_request {
rados_watcherrcb_t errcb;
void *data;

- struct ceph_pagelist *request_pl;
+ struct ceph_databuf *request_pl;
struct ceph_databuf *notify_id_buf;

struct page ***preply_pages;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index c83ae9bb335e..c4486799f54b 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -255,6 +255,16 @@ void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_extent_osd_iter);

+static void osd_req_op_cls_request_info_databuf(struct ceph_osd_request *osd_req,
+ unsigned int which,
+ struct ceph_databuf *dbuf)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_info);
+ ceph_osd_databuf_init(osd_data, dbuf);
+}
+
static void osd_req_op_cls_request_info_pagelist(
struct ceph_osd_request *osd_req,
unsigned int which, struct ceph_pagelist *pagelist)
@@ -779,41 +789,41 @@ int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
const char *class, const char *method)
{
struct ceph_osd_req_op *op;
- struct ceph_pagelist *pagelist;
+ struct ceph_databuf *databuf;
size_t payload_len = 0;
size_t size;
int ret;

op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);

- pagelist = ceph_pagelist_alloc(GFP_NOFS);
- if (!pagelist)
+ databuf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOFS);
+ if (!databuf)
return -ENOMEM;

op->cls.class_name = class;
size = strlen(class);
BUG_ON(size > (size_t) U8_MAX);
op->cls.class_len = size;
- ret = ceph_pagelist_append(pagelist, class, size);
+ ret = ceph_databuf_append(databuf, class, size);
if (ret)
- goto err_pagelist_free;
+ goto err_databuf_free;
payload_len += size;

op->cls.method_name = method;
size = strlen(method);
BUG_ON(size > (size_t) U8_MAX);
op->cls.method_len = size;
- ret = ceph_pagelist_append(pagelist, method, size);
+ ret = ceph_databuf_append(databuf, method, size);
if (ret)
- goto err_pagelist_free;
+ goto err_databuf_free;
payload_len += size;

- osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
+ osd_req_op_cls_request_info_databuf(osd_req, which, databuf);
op->indata_len = payload_len;
return 0;

-err_pagelist_free:
- ceph_pagelist_release(pagelist);
+err_databuf_free:
+ ceph_databuf_release(databuf);
return ret;
}
EXPORT_SYMBOL(osd_req_op_cls_init);
@@ -824,37 +834,37 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
{
struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
opcode, 0);
- struct ceph_pagelist *pagelist;
+ struct ceph_databuf *databuf;
size_t payload_len;
int ret;

BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);

- pagelist = ceph_pagelist_alloc(GFP_NOFS);
- if (!pagelist)
+ databuf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOFS);
+ if (!databuf)
return -ENOMEM;

payload_len = strlen(name);
op->xattr.name_len = payload_len;
- ret = ceph_pagelist_append(pagelist, name, payload_len);
+ ret = ceph_databuf_append(databuf, name, payload_len);
if (ret)
- goto err_pagelist_free;
+ goto err_databuf_free;

op->xattr.value_len = size;
- ret = ceph_pagelist_append(pagelist, value, size);
+ ret = ceph_databuf_append(databuf, value, size);
if (ret)
- goto err_pagelist_free;
+ goto err_databuf_free;
payload_len += size;

op->xattr.cmp_op = cmp_op;
op->xattr.cmp_mode = cmp_mode;

- ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
+ ceph_osd_databuf_init(&op->xattr.osd_data, databuf);
op->indata_len = payload_len;
return 0;

-err_pagelist_free:
- ceph_pagelist_release(pagelist);
+err_databuf_free:
+ ceph_databuf_release(databuf);
return ret;
}
EXPORT_SYMBOL(osd_req_op_xattr_init);
@@ -878,14 +888,14 @@ static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
* encoded in @request_pl
*/
static void osd_req_op_notify_init(struct ceph_osd_request *req, int which,
- u64 cookie, struct ceph_pagelist *request_pl)
+ u64 cookie, struct ceph_databuf *request_pl)
{
struct ceph_osd_req_op *op;

op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
op->notify.cookie = cookie;

- ceph_osd_data_pagelist_init(&op->notify.request_data, request_pl);
+ ceph_osd_databuf_init(&op->notify.request_data, request_pl);
op->indata_len = request_pl->length;
}

@@ -2741,7 +2751,7 @@ static void linger_release(struct kref *kref)
WARN_ON(!list_empty(&lreq->pending_lworks));
WARN_ON(lreq->osd);

- ceph_pagelist_release(lreq->request_pl);
+ ceph_databuf_release(lreq->request_pl);
ceph_databuf_release(lreq->notify_id_buf);
ceph_osdc_put_request(lreq->reg_req);
ceph_osdc_put_request(lreq->ping_req);
@@ -3030,7 +3040,7 @@ static void linger_commit_cb(struct ceph_osd_request *req)
void *p;

WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
- osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
+ osd_data->type != CEPH_OSD_DATA_TYPE_DATABUF);

p = kmap_ceph_databuf_page(osd_data->dbuf, 0);

@@ -4802,30 +4812,30 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
u32 payload_len)
{
struct ceph_osd_req_op *op;
- struct ceph_pagelist *pl;
+ struct ceph_databuf *dbuf;
int ret;

op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);

- pl = ceph_pagelist_alloc(GFP_NOIO);
- if (!pl)
+ dbuf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOIO);
+ if (!dbuf)
return -ENOMEM;

- ret = ceph_pagelist_encode_64(pl, notify_id);
- ret |= ceph_pagelist_encode_64(pl, cookie);
+ ret = ceph_databuf_encode_64(dbuf, notify_id);
+ ret |= ceph_databuf_encode_64(dbuf, cookie);
if (payload) {
- ret |= ceph_pagelist_encode_32(pl, payload_len);
- ret |= ceph_pagelist_append(pl, payload, payload_len);
+ ret |= ceph_databuf_encode_32(dbuf, payload_len);
+ ret |= ceph_databuf_append(dbuf, payload, payload_len);
} else {
- ret |= ceph_pagelist_encode_32(pl, 0);
+ ret |= ceph_databuf_encode_32(dbuf, 0);
}
if (ret) {
- ceph_pagelist_release(pl);
+ ceph_databuf_release(dbuf);
return -ENOMEM;
}

- ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
- op->indata_len = pl->length;
+ ceph_osd_databuf_init(&op->notify_ack.request_data, dbuf);
+ op->indata_len = dbuf->length;
return 0;
}

@@ -4896,16 +4906,16 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
if (!lreq)
return -ENOMEM;

- lreq->request_pl = ceph_pagelist_alloc(GFP_NOIO);
+ lreq->request_pl = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOIO);
if (!lreq->request_pl) {
ret = -ENOMEM;
goto out_put_lreq;
}

- ret = ceph_pagelist_encode_32(lreq->request_pl, 1); /* prot_ver */
- ret |= ceph_pagelist_encode_32(lreq->request_pl, timeout);
- ret |= ceph_pagelist_encode_32(lreq->request_pl, payload_len);
- ret |= ceph_pagelist_append(lreq->request_pl, payload, payload_len);
+ ret = ceph_databuf_encode_32(lreq->request_pl, 1); /* prot_ver */
+ ret |= ceph_databuf_encode_32(lreq->request_pl, timeout);
+ ret |= ceph_databuf_encode_32(lreq->request_pl, payload_len);
+ ret |= ceph_databuf_append(lreq->request_pl, payload, payload_len);
if (ret) {
ret = -ENOMEM;
goto out_put_lreq;


2023-08-04 14:05:50

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 04/18] ceph: Convert ceph_mds_request::r_pagelist to a databuf

Convert ceph_mds_request::r_pagelist to a databuf, along with the stuff
that uses it such as setxattr ops.

Signed-off-by: David Howells <[email protected]>
---
fs/ceph/acl.c | 39 ++++++++++----------
fs/ceph/file.c | 12 ++++---
fs/ceph/inode.c | 85 +++++++++++++++++++-------------------------
fs/ceph/mds_client.c | 11 +++---
fs/ceph/mds_client.h | 2 +-
fs/ceph/super.h | 2 +-
fs/ceph/xattr.c | 67 +++++++++++++++-------------------
7 files changed, 96 insertions(+), 122 deletions(-)

diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 32b26deb1741..49ec339f5783 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -171,7 +171,7 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
{
struct posix_acl *acl, *default_acl;
size_t val_size1 = 0, val_size2 = 0;
- struct ceph_pagelist *pagelist = NULL;
+ struct ceph_databuf *dbuf = NULL;
void *tmp_buf = NULL;
int err;

@@ -201,58 +201,55 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL);
if (!tmp_buf)
goto out_err;
- pagelist = ceph_pagelist_alloc(GFP_KERNEL);
- if (!pagelist)
+ dbuf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_KERNEL);
+ if (!dbuf)
goto out_err;

- err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
- if (err)
- goto out_err;
-
- ceph_pagelist_encode_32(pagelist, acl && default_acl ? 2 : 1);
+ ceph_databuf_encode_32(dbuf, acl && default_acl ? 2 : 1);

if (acl) {
size_t len = strlen(XATTR_NAME_POSIX_ACL_ACCESS);
- err = ceph_pagelist_reserve(pagelist, len + val_size1 + 8);
+ err = ceph_databuf_reserve(dbuf, len + val_size1 + 8,
+ GFP_KERNEL);
if (err)
goto out_err;
- ceph_pagelist_encode_string(pagelist, XATTR_NAME_POSIX_ACL_ACCESS,
- len);
+ ceph_databuf_encode_string(dbuf, XATTR_NAME_POSIX_ACL_ACCESS,
+ len);
err = posix_acl_to_xattr(&init_user_ns, acl,
tmp_buf, val_size1);
if (err < 0)
goto out_err;
- ceph_pagelist_encode_32(pagelist, val_size1);
- ceph_pagelist_append(pagelist, tmp_buf, val_size1);
+ ceph_databuf_encode_32(dbuf, val_size1);
+ ceph_databuf_append(dbuf, tmp_buf, val_size1);
}
if (default_acl) {
size_t len = strlen(XATTR_NAME_POSIX_ACL_DEFAULT);
- err = ceph_pagelist_reserve(pagelist, len + val_size2 + 8);
+ err = ceph_databuf_reserve(dbuf, len + val_size2 + 8,
+ GFP_KERNEL);
if (err)
goto out_err;
- ceph_pagelist_encode_string(pagelist,
- XATTR_NAME_POSIX_ACL_DEFAULT, len);
+ ceph_databuf_encode_string(dbuf,
+ XATTR_NAME_POSIX_ACL_DEFAULT, len);
err = posix_acl_to_xattr(&init_user_ns, default_acl,
tmp_buf, val_size2);
if (err < 0)
goto out_err;
- ceph_pagelist_encode_32(pagelist, val_size2);
- ceph_pagelist_append(pagelist, tmp_buf, val_size2);
+ ceph_databuf_encode_32(dbuf, val_size2);
+ ceph_databuf_append(dbuf, tmp_buf, val_size2);
}

kfree(tmp_buf);

as_ctx->acl = acl;
as_ctx->default_acl = default_acl;
- as_ctx->pagelist = pagelist;
+ as_ctx->dbuf = dbuf;
return 0;

out_err:
posix_acl_release(acl);
posix_acl_release(default_acl);
kfree(tmp_buf);
- if (pagelist)
- ceph_pagelist_release(pagelist);
+ ceph_databuf_release(dbuf);
return err;
}

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 7470daafe595..323e7631c7d8 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -647,9 +647,9 @@ static int ceph_finish_async_create(struct inode *dir, struct inode *inode,
iinfo.change_attr = 1;
ceph_encode_timespec64(&iinfo.btime, &now);

- if (req->r_pagelist) {
- iinfo.xattr_len = req->r_pagelist->length;
- iinfo.xattr_data = req->r_pagelist->mapped_tail;
+ if (req->r_dbuf) {
+ iinfo.xattr_len = req->r_dbuf->length;
+ iinfo.xattr_data = kmap_ceph_databuf_page(req->r_dbuf, 0);
} else {
/* fake it */
iinfo.xattr_len = ARRAY_SIZE(xattr_buf);
@@ -695,6 +695,8 @@ static int ceph_finish_async_create(struct inode *dir, struct inode *inode,
ret = ceph_fill_inode(inode, NULL, &iinfo, NULL, req->r_session,
req->r_fmode, NULL);
up_read(&mdsc->snap_rwsem);
+ if (req->r_dbuf)
+ kunmap_local(iinfo.xattr_data);
if (ret) {
doutc(cl, "failed to fill inode: %d\n", ret);
ceph_dir_clear_complete(dir);
@@ -781,8 +783,8 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
goto out_ctx;
}
/* Async create can't handle more than a page of xattrs */
- if (as_ctx.pagelist &&
- !list_is_singular(&as_ctx.pagelist->head))
+ if (as_ctx.dbuf &&
+ as_ctx.dbuf->nr_bvec > 1)
try_async = false;
} else if (!d_in_lookup(dentry)) {
/* If it's not being looked up, it's negative */
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 3ff4f57f223f..f1c455fced6f 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -110,9 +110,9 @@ struct inode *ceph_new_inode(struct inode *dir, struct dentry *dentry,

void ceph_as_ctx_to_req(struct ceph_mds_request *req, struct ceph_acl_sec_ctx *as_ctx)
{
- if (as_ctx->pagelist) {
- req->r_pagelist = as_ctx->pagelist;
- as_ctx->pagelist = NULL;
+ if (as_ctx->dbuf) {
+ req->r_dbuf = as_ctx->dbuf;
+ as_ctx->dbuf = NULL;
}
ceph_fscrypt_as_ctx_to_req(req, as_ctx);
}
@@ -2343,11 +2343,10 @@ static int fill_fscrypt_truncate(struct inode *inode,
int boff = attr->ia_size % CEPH_FSCRYPT_BLOCK_SIZE;
loff_t pos, orig_pos = round_down(attr->ia_size, CEPH_FSCRYPT_BLOCK_SIZE);
u64 block = orig_pos >> CEPH_FSCRYPT_BLOCK_SHIFT;
- struct ceph_pagelist *pagelist = NULL;
- struct kvec iov = {0};
+ struct ceph_databuf *dbuf = NULL;
struct iov_iter iter;
- struct page *page = NULL;
- struct ceph_fscrypt_truncate_size_header header;
+ struct ceph_fscrypt_truncate_size_header *header;
+ void *p;
int retry_op = 0;
int len = CEPH_FSCRYPT_BLOCK_SIZE;
loff_t i_size = i_size_read(inode);
@@ -2373,37 +2372,35 @@ static int fill_fscrypt_truncate(struct inode *inode,
goto out;
}

- page = __page_cache_alloc(GFP_KERNEL);
- if (page == NULL) {
- ret = -ENOMEM;
+ ret = -ENOMEM;
+ dbuf = ceph_databuf_alloc(2, 0, GFP_KERNEL);
+ if (!dbuf)
goto out;
- }

- pagelist = ceph_pagelist_alloc(GFP_KERNEL);
- if (!pagelist) {
- ret = -ENOMEM;
+ if (ceph_databuf_insert_frag(dbuf, 0, sizeof(*header), GFP_KERNEL) < 0)
+ goto out;
+ if (ceph_databuf_insert_frag(dbuf, 1, PAGE_SIZE, GFP_KERNEL) < 0)
goto out;
- }

- iov.iov_base = kmap_local_page(page);
- iov.iov_len = len;
- iov_iter_kvec(&iter, READ, &iov, 1, len);
+ iov_iter_bvec(&iter, ITER_DEST, &dbuf->bvec[1], 1, len);

pos = orig_pos;
ret = __ceph_sync_read(inode, &pos, &iter, &retry_op, &objver);
if (ret < 0)
goto out;

+ header = kmap_ceph_databuf_page(dbuf, 0);
+
/* Insert the header first */
- header.ver = 1;
- header.compat = 1;
- header.change_attr = cpu_to_le64(inode_peek_iversion_raw(inode));
+ header->ver = 1;
+ header->compat = 1;
+ header->change_attr = cpu_to_le64(inode_peek_iversion_raw(inode));

/*
* Always set the block_size to CEPH_FSCRYPT_BLOCK_SIZE,
* because in MDS it may need this to do the truncate.
*/
- header.block_size = cpu_to_le32(CEPH_FSCRYPT_BLOCK_SIZE);
+ header->block_size = cpu_to_le32(CEPH_FSCRYPT_BLOCK_SIZE);

/*
* If we hit a hole here, we should just skip filling
@@ -2418,51 +2415,41 @@ static int fill_fscrypt_truncate(struct inode *inode,
if (!objver) {
doutc(cl, "hit hole, ppos %lld < size %lld\n", pos, i_size);

- header.data_len = cpu_to_le32(8 + 8 + 4);
- header.file_offset = 0;
+ header->data_len = cpu_to_le32(8 + 8 + 4);
+ header->file_offset = 0;
ret = 0;
} else {
- header.data_len = cpu_to_le32(8 + 8 + 4 + CEPH_FSCRYPT_BLOCK_SIZE);
- header.file_offset = cpu_to_le64(orig_pos);
+ header->data_len = cpu_to_le32(8 + 8 + 4 + CEPH_FSCRYPT_BLOCK_SIZE);
+ header->file_offset = cpu_to_le64(orig_pos);

doutc(cl, "encrypt block boff/bsize %d/%lu\n", boff,
CEPH_FSCRYPT_BLOCK_SIZE);

/* truncate and zero out the extra contents for the last block */
- memset(iov.iov_base + boff, 0, PAGE_SIZE - boff);
+ p = kmap_ceph_databuf_page(dbuf, 1);
+ memset(p + boff, 0, PAGE_SIZE - boff);
+ kunmap_local(p);

/* encrypt the last block */
- ret = ceph_fscrypt_encrypt_block_inplace(inode, page,
- CEPH_FSCRYPT_BLOCK_SIZE,
- 0, block,
- GFP_KERNEL);
+ ret = ceph_fscrypt_encrypt_block_inplace(
+ inode, ceph_databuf_page(dbuf, 1),
+ CEPH_FSCRYPT_BLOCK_SIZE, 0, block, GFP_KERNEL);
if (ret)
goto out;
}

- /* Insert the header */
- ret = ceph_pagelist_append(pagelist, &header, sizeof(header));
- if (ret)
- goto out;
+ dbuf->length = sizeof(*header);
+ if (header->block_size)
+ dbuf->length += CEPH_FSCRYPT_BLOCK_SIZE;

- if (header.block_size) {
- /* Append the last block contents to pagelist */
- ret = ceph_pagelist_append(pagelist, iov.iov_base,
- CEPH_FSCRYPT_BLOCK_SIZE);
- if (ret)
- goto out;
- }
- req->r_pagelist = pagelist;
+ req->r_dbuf = dbuf;
out:
doutc(cl, "%p %llx.%llx size dropping cap refs on %s\n", inode,
ceph_vinop(inode), ceph_cap_string(got));
ceph_put_cap_refs(ci, got);
- if (iov.iov_base)
- kunmap_local(iov.iov_base);
- if (page)
- __free_pages(page, 0);
- if (ret && pagelist)
- ceph_pagelist_release(pagelist);
+ kunmap_local(header);
+ if (ret)
+ ceph_databuf_release(dbuf);
return ret;
}

diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 9aae39289b43..85b2f1eccf88 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1121,8 +1121,7 @@ void ceph_mdsc_release_request(struct kref *kref)
kfree(req->r_path1);
kfree(req->r_path2);
put_cred(req->r_cred);
- if (req->r_pagelist)
- ceph_pagelist_release(req->r_pagelist);
+ ceph_databuf_release(req->r_dbuf);
kfree(req->r_fscrypt_auth);
kfree(req->r_altname);
put_request_session(req);
@@ -3108,10 +3107,10 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);

- if (req->r_pagelist) {
- struct ceph_pagelist *pagelist = req->r_pagelist;
- ceph_msg_data_add_pagelist(msg, pagelist);
- msg->hdr.data_len = cpu_to_le32(pagelist->length);
+ if (req->r_dbuf) {
+ struct ceph_databuf *dbuf = req->r_dbuf;
+ ceph_msg_data_add_databuf(msg, dbuf);
+ msg->hdr.data_len = cpu_to_le32(dbuf->length);
} else {
msg->hdr.data_len = 0;
}
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 717a7399bacb..ab1abc38911b 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -307,7 +307,7 @@ struct ceph_mds_request {
u32 r_direct_hash; /* choose dir frag based on this dentry hash */

/* data payload is used for xattr ops */
- struct ceph_pagelist *r_pagelist;
+ struct ceph_databuf *r_dbuf;

/* what caps shall we drop? */
int r_inode_drop, r_inode_unless;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 4e78de1be23e..681e634052b1 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -1139,7 +1139,7 @@ struct ceph_acl_sec_ctx {
#ifdef CONFIG_FS_ENCRYPTION
struct ceph_fscrypt_auth *fscrypt_auth;
#endif
- struct ceph_pagelist *pagelist;
+ struct ceph_databuf *dbuf;
};

#ifdef CONFIG_SECURITY
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index d4624f56606d..ca3ec5dd0382 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -1113,17 +1113,17 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
struct ceph_mds_request *req;
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_osd_client *osdc = &fsc->client->osdc;
- struct ceph_pagelist *pagelist = NULL;
+ struct ceph_databuf *dbuf = NULL;
int op = CEPH_MDS_OP_SETXATTR;
int err;

if (size > 0) {
- /* copy value into pagelist */
- pagelist = ceph_pagelist_alloc(GFP_NOFS);
- if (!pagelist)
+ /* copy value into dbuf */
+ dbuf = ceph_databuf_alloc(1, size, GFP_NOFS);
+ if (!dbuf)
return -ENOMEM;

- err = ceph_pagelist_append(pagelist, value, size);
+ err = ceph_databuf_append(dbuf, value, size);
if (err)
goto out;
} else if (!value) {
@@ -1153,8 +1153,8 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
req->r_args.setxattr.flags = cpu_to_le32(flags);
req->r_args.setxattr.osdmap_epoch =
cpu_to_le32(osdc->osdmap->epoch);
- req->r_pagelist = pagelist;
- pagelist = NULL;
+ req->r_dbuf = dbuf;
+ dbuf = NULL;
}

req->r_inode = inode;
@@ -1168,8 +1168,7 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
doutc(cl, "xattr.ver (after): %lld\n", ci->i_xattrs.version);

out:
- if (pagelist)
- ceph_pagelist_release(pagelist);
+ ceph_databuf_release(dbuf);
return err;
}

@@ -1376,7 +1375,7 @@ bool ceph_security_xattr_deadlock(struct inode *in)
int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
struct ceph_acl_sec_ctx *as_ctx)
{
- struct ceph_pagelist *pagelist = as_ctx->pagelist;
+ struct ceph_databuf *dbuf = as_ctx->dbuf;
const char *name;
size_t name_len;
int err;
@@ -1391,14 +1390,11 @@ int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
}

err = -ENOMEM;
- if (!pagelist) {
- pagelist = ceph_pagelist_alloc(GFP_KERNEL);
- if (!pagelist)
+ if (!dbuf) {
+ dbuf = ceph_databuf_alloc(0, PAGE_SIZE, GFP_KERNEL);
+ if (!dbuf)
goto out;
- err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
- if (err)
- goto out;
- ceph_pagelist_encode_32(pagelist, 1);
+ ceph_databuf_encode_32(dbuf, 1);
}

/*
@@ -1407,37 +1403,31 @@ int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
* dentry_init_security hook.
*/
name_len = strlen(name);
- err = ceph_pagelist_reserve(pagelist,
- 4 * 2 + name_len + as_ctx->sec_ctxlen);
+ err = ceph_databuf_reserve(dbuf, 4 * 2 + name_len + as_ctx->sec_ctxlen,
+ GFP_KERNEL);
if (err)
goto out;

- if (as_ctx->pagelist) {
+ if (as_ctx->dbuf) {
/* update count of KV pairs */
- BUG_ON(pagelist->length <= sizeof(__le32));
- if (list_is_singular(&pagelist->head)) {
- le32_add_cpu((__le32*)pagelist->mapped_tail, 1);
- } else {
- struct page *page = list_first_entry(&pagelist->head,
- struct page, lru);
- void *addr = kmap_atomic(page);
- le32_add_cpu((__le32*)addr, 1);
- kunmap_atomic(addr);
- }
+ __le32 *addr = kmap_ceph_databuf_page(dbuf, 0);
+ BUG_ON(dbuf->length <= sizeof(__le32));
+ le32_add_cpu(addr, 1);
+ kunmap_local(addr);
} else {
- as_ctx->pagelist = pagelist;
+ as_ctx->dbuf = dbuf;
}

- ceph_pagelist_encode_32(pagelist, name_len);
- ceph_pagelist_append(pagelist, name, name_len);
+ ceph_databuf_encode_32(dbuf, name_len);
+ ceph_databuf_append(dbuf, name, name_len);

- ceph_pagelist_encode_32(pagelist, as_ctx->sec_ctxlen);
- ceph_pagelist_append(pagelist, as_ctx->sec_ctx, as_ctx->sec_ctxlen);
+ ceph_databuf_encode_32(dbuf, as_ctx->sec_ctxlen);
+ ceph_databuf_append(dbuf, as_ctx->sec_ctx, as_ctx->sec_ctxlen);

err = 0;
out:
- if (pagelist && !as_ctx->pagelist)
- ceph_pagelist_release(pagelist);
+ if (dbuf && !as_ctx->dbuf)
+ ceph_databuf_release(dbuf);
return err;
}
#endif /* CONFIG_CEPH_FS_SECURITY_LABEL */
@@ -1455,8 +1445,7 @@ void ceph_release_acl_sec_ctx(struct ceph_acl_sec_ctx *as_ctx)
#ifdef CONFIG_FS_ENCRYPTION
kfree(as_ctx->fscrypt_auth);
#endif
- if (as_ctx->pagelist)
- ceph_pagelist_release(as_ctx->pagelist);
+ ceph_databuf_release(as_ctx->dbuf);
}

/*


2023-08-04 14:07:21

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 16/18] ceph: Remove CEPH_OS_DATA_TYPE_PAGES and its attendant helpers

---
include/linux/ceph/osd_client.h | 20 ++----------
net/ceph/osd_client.c | 57 +--------------------------------
2 files changed, 3 insertions(+), 74 deletions(-)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 3099f923c241..1a1137787487 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -103,23 +103,13 @@ struct ceph_osd {
enum ceph_osd_data_type {
CEPH_OSD_DATA_TYPE_NONE = 0,
CEPH_OSD_DATA_TYPE_DATABUF,
- CEPH_OSD_DATA_TYPE_PAGES,
CEPH_OSD_DATA_TYPE_ITER,
};

struct ceph_osd_data {
enum ceph_osd_data_type type;
- union {
- struct ceph_databuf *dbuf;
- struct {
- struct page **pages;
- u64 length;
- u32 offset;
- bool pages_from_pool;
- bool own_pages;
- };
- struct iov_iter iter;
- };
+ struct ceph_databuf *dbuf;
+ struct iov_iter iter;
};

struct ceph_osd_req_op {
@@ -451,12 +441,6 @@ void ceph_osdc_clear_abort_err(struct ceph_osd_client *osdc);
struct ceph_osd_req_op *osd_req_op_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode, u32 flags);

-extern void osd_req_op_raw_data_in_pages(struct ceph_osd_request *,
- unsigned int which,
- struct page **pages, u64 length,
- u32 offset, bool pages_from_pool,
- bool own_pages);
-
extern void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 offset, u64 length,
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 0fe16fdc760f..70f81a0b62c0 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -122,21 +122,6 @@ static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
}

-/*
- * Consumes @pages if @own_pages is true.
- */
-static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
- struct page **pages, u64 length, u32 offset,
- bool pages_from_pool, bool own_pages)
-{
- osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
- osd_data->pages = pages;
- osd_data->length = length;
- osd_data->offset = offset;
- osd_data->pages_from_pool = pages_from_pool;
- osd_data->own_pages = own_pages;
-}
-
static void ceph_osd_iter_init(struct ceph_osd_data *osd_data,
struct iov_iter *iter)
{
@@ -181,19 +166,6 @@ void osd_req_op_raw_data_in_databuf(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_raw_data_in_databuf);

-void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
- unsigned int which, struct page **pages,
- u64 length, u32 offset,
- bool pages_from_pool, bool own_pages)
-{
- struct ceph_osd_data *osd_data;
-
- osd_data = osd_req_op_raw_data_in(osd_req, which);
- ceph_osd_data_pages_init(osd_data, pages, length, offset,
- pages_from_pool, own_pages);
-}
-EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
-
void osd_req_op_extent_osd_databuf(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_databuf *dbuf)
@@ -205,19 +177,6 @@ void osd_req_op_extent_osd_databuf(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_extent_osd_databuf);

-void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
- unsigned int which, struct page **pages,
- u64 length, u32 offset,
- bool pages_from_pool, bool own_pages)
-{
- struct ceph_osd_data *osd_data;
-
- osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
- ceph_osd_data_pages_init(osd_data, pages, length, offset,
- pages_from_pool, own_pages);
-}
-EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
-
/**
* osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer
* @osd_req: The request to set up
@@ -285,8 +244,6 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
switch (osd_data->type) {
case CEPH_OSD_DATA_TYPE_NONE:
return 0;
- case CEPH_OSD_DATA_TYPE_PAGES:
- return osd_data->length;
case CEPH_OSD_DATA_TYPE_ITER:
return iov_iter_count(&osd_data->iter);
default:
@@ -297,13 +254,6 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)

static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
{
- if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
- int num_pages;
-
- num_pages = calc_pages_for((u64)osd_data->offset,
- (u64)osd_data->length);
- ceph_release_page_vector(osd_data->pages, num_pages);
- }
ceph_osd_data_init(osd_data);
}

@@ -881,12 +831,7 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
{
u64 length = ceph_osd_data_length(osd_data);

- if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
- BUG_ON(length > (u64) SIZE_MAX);
- if (length)
- ceph_msg_data_add_pages(msg, osd_data->pages,
- length, osd_data->offset, false);
- } else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) {
+ if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) {
ceph_msg_data_add_iter(msg, &osd_data->iter);
} else {
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);


2023-08-04 14:09:47

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 18/18] ceph: Don't use data_pages

---
fs/ceph/addr.c | 16 +++++-----------
fs/ceph/file.c | 34 +++++++++++++++------------------
include/linux/ceph/osd_client.h | 1 +
net/ceph/osd_client.c | 16 ++--------------
4 files changed, 23 insertions(+), 44 deletions(-)

diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 7571606cf61f..7557f4a85ef0 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -277,11 +277,6 @@ static void finish_netfs_read(struct ceph_osd_request *req)
}
}

- if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
- ceph_put_page_vector(osd_data->pages,
- calc_pages_for(osd_data->offset,
- osd_data->length), false);
- }
netfs_subreq_terminated(subreq, err, false);
iput(req->r_inode);
ceph_dec_osd_stopping_blocker(fsc->mdsc);
@@ -2007,7 +2002,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
struct rb_node **p, *parent;
struct ceph_pool_perm *perm;
- struct page **pages;
+ struct ceph_databuf *dbuf;
size_t pool_ns_len;
int err = 0, err2 = 0, have = 0;

@@ -2107,14 +2102,13 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
goto out_unlock;

/* one page should be large enough for STAT data */
- pages = ceph_alloc_page_vector(1, GFP_KERNEL);
- if (IS_ERR(pages)) {
- err = PTR_ERR(pages);
+ dbuf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_KERNEL);
+ if (!dbuf) {
+ err = -ENOMEM;
goto out_unlock;
}

- osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
- 0, false, true);
+ osd_req_op_raw_data_in_databuf(rd_req, 0, dbuf);
ceph_osdc_start_request(&fsc->client->osdc, rd_req);

wr_req->r_mtime = ci->netfs.inode.i_mtime;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 5d16469a3690..caf557187ca8 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -977,6 +977,7 @@ ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos,
struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
struct ceph_client *cl = fsc->client;
struct ceph_osd_client *osdc = &fsc->client->osdc;
+ struct ceph_databuf *dbuf;
ssize_t ret;
u64 off = *ki_pos;
u64 len = iov_iter_count(to);
@@ -1041,16 +1042,14 @@ ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos,

num_pages = calc_pages_for(read_off, read_len);
page_off = offset_in_page(off);
- pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
- if (IS_ERR(pages)) {
+ dbuf = ceph_databuf_alloc(num_pages, read_len, GFP_KERNEL);
+ if (!dbuf) {
ceph_osdc_put_request(req);
- ret = PTR_ERR(pages);
+ ret = -ENOMEM;
break;
}

- osd_req_op_extent_osd_data_pages(req, 0, pages, read_len,
- offset_in_page(read_off),
- false, false);
+ osd_req_op_extent_osd_databuf(req, 0, dbuf);

op = &req->r_ops[0];
if (sparse) {
@@ -1137,7 +1136,7 @@ ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos,
break;
}
}
- ceph_release_page_vector(pages, num_pages);
+ ceph_databuf_release(dbuf);

if (ret < 0) {
if (ret == -EBLOCKLISTED)
@@ -1625,7 +1624,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
struct ceph_client *cl = fsc->client;
struct ceph_osd_client *osdc = &fsc->client->osdc;
struct ceph_osd_request *req;
- struct page **pages;
+ struct ceph_databuf *dbuf = NULL;
u64 len;
int num_pages;
int written = 0;
@@ -1691,9 +1690,9 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
* an array of pagecache pages.
*/
num_pages = calc_pages_for(write_pos, write_len);
- pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
- if (IS_ERR(pages)) {
- ret = PTR_ERR(pages);
+ dbuf = ceph_databuf_alloc(num_pages, num_pages * PAGE_SIZE, GFP_KERNEL);
+ if (!dbuf) {
+ ret = -ENOMEM;
break;
}

@@ -1722,7 +1721,6 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
NULL, ci->i_truncate_seq,
ci->i_truncate_size, false);
if (IS_ERR(req)) {
- ceph_release_page_vector(pages, num_pages);
ret = PTR_ERR(req);
break;
}
@@ -1730,7 +1728,6 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
/* Something is misaligned! */
if (read_len != CEPH_FSCRYPT_BLOCK_SIZE) {
ceph_osdc_put_request(req);
- ceph_release_page_vector(pages, num_pages);
ret = -EIO;
break;
}
@@ -1739,15 +1736,14 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
op = &req->r_ops[0];

if (first) {
- osd_req_op_extent_osd_data_pages(req, 0, pages,
- CEPH_FSCRYPT_BLOCK_SIZE,
- offset_in_page(first_pos),
- false, false);
+ iov_iter_advance(&dbuf->iter, offset_in_page(first_pos));
+ osd_req_op_extent_osd_databuf(req, 0, dbuf,
+ CEPH_FSCRYPT_BLOCK_SIZE);
+ dbuf = NULL;
/* We only expect a single extent here */
ret = __ceph_alloc_sparse_ext_map(op, 1);
if (ret) {
ceph_osdc_put_request(req);
- ceph_release_page_vector(pages, num_pages);
break;
}
}
@@ -1766,7 +1762,6 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
ret = __ceph_alloc_sparse_ext_map(op, 1);
if (ret) {
ceph_osdc_put_request(req);
- ceph_release_page_vector(pages, num_pages);
break;
}

@@ -1998,6 +1993,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,

}

+ ceph_databuf_release(dbuf);
if (ret != -EOLDSNAPC && written > 0) {
ret = written;
iocb->ki_pos = pos;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 1a1137787487..c26a7866695a 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -110,6 +110,7 @@ struct ceph_osd_data {
enum ceph_osd_data_type type;
struct ceph_databuf *dbuf;
struct iov_iter iter;
+ size_t length;
};

struct ceph_osd_req_op {
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 6fb78ae14f03..95daf4cdb07b 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -127,6 +127,7 @@ static void ceph_osd_iter_init(struct ceph_osd_data *osd_data,
{
osd_data->type = CEPH_OSD_DATA_TYPE_ITER;
osd_data->iter = *iter;
+ osd_data->length = iter->count;
}

/*
@@ -239,19 +240,6 @@ void osd_req_op_cls_response_databuf(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_cls_response_databuf);

-static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
-{
- switch (osd_data->type) {
- case CEPH_OSD_DATA_TYPE_NONE:
- return 0;
- case CEPH_OSD_DATA_TYPE_ITER:
- return iov_iter_count(&osd_data->iter);
- default:
- WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
- return 0;
- }
-}
-
static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
{
ceph_osd_data_init(osd_data);
@@ -4475,7 +4463,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
if (data) {
if (lreq->reply) {
WARN_ON(data->type !=
- CEPH_MSG_DATA_PAGES);
+ CEPH_MSG_DATA_DATABUF);
*lreq->preply_pages = data->pages;
*lreq->preply_len = data->length;
data->own_pages = false;


2023-08-04 14:17:21

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 14/18] ceph: Remove ceph_pagelist

Remove ceph_pagelist and its helpers.

Signed-off-by: David Howells <[email protected]>
---
fs/ceph/locks.c | 1 -
fs/ceph/mds_client.c | 1 -
fs/ceph/xattr.c | 1 -
include/linux/ceph/messenger.h | 8 --
include/linux/ceph/osd_client.h | 9 --
include/linux/ceph/pagelist.h | 72 --------------
net/ceph/Makefile | 2 +-
net/ceph/messenger.c | 110 --------------------
net/ceph/osd_client.c | 51 ----------
net/ceph/pagelist.c | 171 --------------------------------
10 files changed, 1 insertion(+), 425 deletions(-)
delete mode 100644 include/linux/ceph/pagelist.h
delete mode 100644 net/ceph/pagelist.c

diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index b3c018a8a92f..f80b09304fdc 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -8,7 +8,6 @@
#include "super.h"
#include "mds_client.h"
#include <linux/filelock.h>
-#include <linux/ceph/pagelist.h>

static u64 lock_secret;
static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 9f5c4f47982e..e94877725824 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -21,7 +21,6 @@
#include <linux/ceph/ceph_features.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/decode.h>
-#include <linux/ceph/pagelist.h>
#include <linux/ceph/auth.h>
#include <linux/ceph/debugfs.h>

diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index ca3ec5dd0382..d42779d10dc9 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -1,6 +1,5 @@
// SPDX-License-Identifier: GPL-2.0
#include <linux/ceph/ceph_debug.h>
-#include <linux/ceph/pagelist.h>

#include "super.h"
#include "mds_client.h"
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 0f4cc6e39da0..a2489e266bff 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -119,7 +119,6 @@ enum ceph_msg_data_type {
CEPH_MSG_DATA_NONE, /* message contains no data payload */
CEPH_MSG_DATA_DATABUF, /* data source/destination is a data buffer */
CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */
- CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */
CEPH_MSG_DATA_ITER, /* data source/destination is an iov_iter */
};

@@ -135,7 +134,6 @@ struct ceph_msg_data {
unsigned int offset; /* first page */
bool own_pages;
};
- struct ceph_pagelist *pagelist;
};
};

@@ -152,10 +150,6 @@ struct ceph_msg_data_cursor {
unsigned short page_index; /* index in array */
unsigned short page_count; /* pages in array */
};
- struct { /* pagelist */
- struct page *page; /* page from list */
- size_t offset; /* bytes from list */
- };
struct {
struct iov_iter iov_iter;
unsigned int lastlen;
@@ -510,8 +504,6 @@ extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf);
void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t offset, bool own_pages);
-extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
- struct ceph_pagelist *pagelist);
void ceph_msg_data_add_iter(struct ceph_msg *msg,
struct iov_iter *iter);

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 82c1c325861d..83c3073c44bb 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -15,7 +15,6 @@
#include <linux/ceph/messenger.h>
#include <linux/ceph/msgpool.h>
#include <linux/ceph/auth.h>
-#include <linux/ceph/pagelist.h>
#include <linux/ceph/databuf.h>

struct ceph_msg;
@@ -105,7 +104,6 @@ enum ceph_osd_data_type {
CEPH_OSD_DATA_TYPE_NONE = 0,
CEPH_OSD_DATA_TYPE_DATABUF,
CEPH_OSD_DATA_TYPE_PAGES,
- CEPH_OSD_DATA_TYPE_PAGELIST,
CEPH_OSD_DATA_TYPE_ITER,
};

@@ -120,7 +118,6 @@ struct ceph_osd_data {
bool pages_from_pool;
bool own_pages;
};
- struct ceph_pagelist *pagelist;
struct iov_iter iter;
};
};
@@ -486,18 +483,12 @@ extern void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *,
void osd_req_op_raw_data_in_databuf(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_databuf *databuf);
-extern void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *,
- unsigned int which,
- struct ceph_pagelist *pagelist);
void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req,
unsigned int which, struct iov_iter *iter);

void osd_req_op_cls_request_databuf(struct ceph_osd_request *req,
unsigned int which,
struct ceph_databuf *dbuf);
-extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
- unsigned int which,
- struct ceph_pagelist *pagelist);
void osd_req_op_cls_response_databuf(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_databuf *dbuf);
diff --git a/include/linux/ceph/pagelist.h b/include/linux/ceph/pagelist.h
deleted file mode 100644
index 5dead8486fd8..000000000000
--- a/include/linux/ceph/pagelist.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/* SPDX-License-Identifier: GPL-2.0 */
-#ifndef __FS_CEPH_PAGELIST_H
-#define __FS_CEPH_PAGELIST_H
-
-#include <asm/byteorder.h>
-#include <linux/refcount.h>
-#include <linux/list.h>
-#include <linux/types.h>
-
-struct ceph_pagelist {
- struct list_head head;
- void *mapped_tail;
- size_t length;
- size_t room;
- struct list_head free_list;
- size_t num_pages_free;
- refcount_t refcnt;
-};
-
-struct ceph_pagelist_cursor {
- struct ceph_pagelist *pl; /* pagelist, for error checking */
- struct list_head *page_lru; /* page in list */
- size_t room; /* room remaining to reset to */
-};
-
-struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags);
-
-extern void ceph_pagelist_release(struct ceph_pagelist *pl);
-
-extern int ceph_pagelist_append(struct ceph_pagelist *pl, const void *d, size_t l);
-
-extern int ceph_pagelist_reserve(struct ceph_pagelist *pl, size_t space);
-
-extern int ceph_pagelist_free_reserve(struct ceph_pagelist *pl);
-
-extern void ceph_pagelist_set_cursor(struct ceph_pagelist *pl,
- struct ceph_pagelist_cursor *c);
-
-extern int ceph_pagelist_truncate(struct ceph_pagelist *pl,
- struct ceph_pagelist_cursor *c);
-
-static inline int ceph_pagelist_encode_64(struct ceph_pagelist *pl, u64 v)
-{
- __le64 ev = cpu_to_le64(v);
- return ceph_pagelist_append(pl, &ev, sizeof(ev));
-}
-static inline int ceph_pagelist_encode_32(struct ceph_pagelist *pl, u32 v)
-{
- __le32 ev = cpu_to_le32(v);
- return ceph_pagelist_append(pl, &ev, sizeof(ev));
-}
-static inline int ceph_pagelist_encode_16(struct ceph_pagelist *pl, u16 v)
-{
- __le16 ev = cpu_to_le16(v);
- return ceph_pagelist_append(pl, &ev, sizeof(ev));
-}
-static inline int ceph_pagelist_encode_8(struct ceph_pagelist *pl, u8 v)
-{
- return ceph_pagelist_append(pl, &v, 1);
-}
-static inline int ceph_pagelist_encode_string(struct ceph_pagelist *pl,
- char *s, u32 len)
-{
- int ret = ceph_pagelist_encode_32(pl, len);
- if (ret)
- return ret;
- if (len)
- return ceph_pagelist_append(pl, s, len);
- return 0;
-}
-
-#endif
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index 4b2e0b654e45..0c8787e2e733 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -4,7 +4,7 @@
#
obj-$(CONFIG_CEPH_LIB) += libceph.o

-libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
+libceph-y := ceph_common.o messenger.o msgpool.o buffer.o \
mon_client.o decode.o \
cls_lock_client.o \
osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 1ef3576c930d..5b28c27858b2 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -20,7 +20,6 @@
#include <linux/ceph/libceph.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/decode.h>
-#include <linux/ceph/pagelist.h>
#include <linux/export.h>

/*
@@ -775,87 +774,6 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
return true;
}

-/*
- * For a pagelist, a piece is whatever remains to be consumed in the
- * first page in the list, or the front of the next page.
- */
-static void
-ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
- size_t length)
-{
- struct ceph_msg_data *data = cursor->data;
- struct ceph_pagelist *pagelist;
- struct page *page;
-
- BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
-
- pagelist = data->pagelist;
- BUG_ON(!pagelist);
-
- if (!length)
- return; /* pagelist can be assigned but empty */
-
- BUG_ON(list_empty(&pagelist->head));
- page = list_first_entry(&pagelist->head, struct page, lru);
-
- cursor->resid = min(length, pagelist->length);
- cursor->page = page;
- cursor->offset = 0;
-}
-
-static struct page *
-ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
- size_t *page_offset, size_t *length)
-{
- struct ceph_msg_data *data = cursor->data;
- struct ceph_pagelist *pagelist;
-
- BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
-
- pagelist = data->pagelist;
- BUG_ON(!pagelist);
-
- BUG_ON(!cursor->page);
- BUG_ON(cursor->offset + cursor->resid != pagelist->length);
-
- /* offset of first page in pagelist is always 0 */
- *page_offset = cursor->offset & ~PAGE_MASK;
- *length = min_t(size_t, cursor->resid, PAGE_SIZE - *page_offset);
- return cursor->page;
-}
-
-static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
- size_t bytes)
-{
- struct ceph_msg_data *data = cursor->data;
- struct ceph_pagelist *pagelist;
-
- BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
-
- pagelist = data->pagelist;
- BUG_ON(!pagelist);
-
- BUG_ON(cursor->offset + cursor->resid != pagelist->length);
- BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
-
- /* Advance the cursor offset */
-
- cursor->resid -= bytes;
- cursor->offset += bytes;
- /* offset of first page in pagelist is always 0 */
- if (!bytes || cursor->offset & ~PAGE_MASK)
- return false; /* more bytes to process in the current page */
-
- if (!cursor->resid)
- return false; /* no more data */
-
- /* Move on to the next page */
-
- BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
- cursor->page = list_next_entry(cursor->page, lru);
- return true;
-}
-
static void ceph_msg_data_iter_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length)
{
@@ -926,9 +844,6 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
size_t length = cursor->total_resid;

switch (cursor->data->type) {
- case CEPH_MSG_DATA_PAGELIST:
- ceph_msg_data_pagelist_cursor_init(cursor, length);
- break;
case CEPH_MSG_DATA_PAGES:
ceph_msg_data_pages_cursor_init(cursor, length);
break;
@@ -968,9 +883,6 @@ struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
struct page *page;

switch (cursor->data->type) {
- case CEPH_MSG_DATA_PAGELIST:
- page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
- break;
case CEPH_MSG_DATA_PAGES:
page = ceph_msg_data_pages_next(cursor, page_offset, length);
break;
@@ -1001,9 +913,6 @@ void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)

BUG_ON(bytes > cursor->resid);
switch (cursor->data->type) {
- case CEPH_MSG_DATA_PAGELIST:
- new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
- break;
case CEPH_MSG_DATA_PAGES:
new_piece = ceph_msg_data_pages_advance(cursor, bytes);
break;
@@ -1740,8 +1649,6 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
} else if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
int num_pages = calc_pages_for(data->offset, data->length);
ceph_release_page_vector(data->pages, num_pages);
- } else if (data->type == CEPH_MSG_DATA_PAGELIST) {
- ceph_pagelist_release(data->pagelist);
}
}

@@ -1782,23 +1689,6 @@ void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
}
EXPORT_SYMBOL(ceph_msg_data_add_pages);

-void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
- struct ceph_pagelist *pagelist)
-{
- struct ceph_msg_data *data;
-
- BUG_ON(!pagelist);
- BUG_ON(!pagelist->length);
-
- data = ceph_msg_data_add(msg);
- data->type = CEPH_MSG_DATA_PAGELIST;
- refcount_inc(&pagelist->refcnt);
- data->pagelist = pagelist;
-
- msg->data_length += pagelist->length;
-}
-EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
-
void ceph_msg_data_add_iter(struct ceph_msg *msg,
struct iov_iter *iter)
{
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index c4486799f54b..8cbe06d2e16d 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -16,7 +16,6 @@
#include <linux/ceph/messenger.h>
#include <linux/ceph/decode.h>
#include <linux/ceph/auth.h>
-#include <linux/ceph/pagelist.h>
#include <linux/ceph/striper.h>

#define OSD_OPREPLY_FRONT_LEN 512
@@ -138,16 +137,6 @@ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
osd_data->own_pages = own_pages;
}

-/*
- * Consumes a ref on @pagelist.
- */
-static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
- struct ceph_pagelist *pagelist)
-{
- osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
- osd_data->pagelist = pagelist;
-}
-
static void ceph_osd_iter_init(struct ceph_osd_data *osd_data,
struct iov_iter *iter)
{
@@ -229,16 +218,6 @@ void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);

-void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
- unsigned int which, struct ceph_pagelist *pagelist)
-{
- struct ceph_osd_data *osd_data;
-
- osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
- ceph_osd_data_pagelist_init(osd_data, pagelist);
-}
-EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
-
/**
* osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer
* @osd_req: The request to set up
@@ -265,16 +244,6 @@ static void osd_req_op_cls_request_info_databuf(struct ceph_osd_request *osd_req
ceph_osd_databuf_init(osd_data, dbuf);
}

-static void osd_req_op_cls_request_info_pagelist(
- struct ceph_osd_request *osd_req,
- unsigned int which, struct ceph_pagelist *pagelist)
-{
- struct ceph_osd_data *osd_data;
-
- osd_data = osd_req_op_data(osd_req, which, cls, request_info);
- ceph_osd_data_pagelist_init(osd_data, pagelist);
-}
-
void osd_req_op_cls_request_databuf(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_databuf *dbuf)
@@ -288,19 +257,6 @@ void osd_req_op_cls_request_databuf(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_cls_request_databuf);

-void osd_req_op_cls_request_data_pagelist(
- struct ceph_osd_request *osd_req,
- unsigned int which, struct ceph_pagelist *pagelist)
-{
- struct ceph_osd_data *osd_data;
-
- osd_data = osd_req_op_data(osd_req, which, cls, request_data);
- ceph_osd_data_pagelist_init(osd_data, pagelist);
- osd_req->r_ops[which].cls.indata_len += pagelist->length;
- osd_req->r_ops[which].indata_len += pagelist->length;
-}
-EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
-
static void osd_req_op_cls_request_data_iter(
struct ceph_osd_request *osd_req,
unsigned int which, struct iov_iter *iter)
@@ -331,8 +287,6 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
return 0;
case CEPH_OSD_DATA_TYPE_PAGES:
return osd_data->length;
- case CEPH_OSD_DATA_TYPE_PAGELIST:
- return (u64)osd_data->pagelist->length;
case CEPH_OSD_DATA_TYPE_ITER:
return iov_iter_count(&osd_data->iter);
default:
@@ -349,8 +303,6 @@ static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
num_pages = calc_pages_for((u64)osd_data->offset,
(u64)osd_data->length);
ceph_release_page_vector(osd_data->pages, num_pages);
- } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
- ceph_pagelist_release(osd_data->pagelist);
}
ceph_osd_data_init(osd_data);
}
@@ -934,9 +886,6 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
if (length)
ceph_msg_data_add_pages(msg, osd_data->pages,
length, osd_data->offset, false);
- } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
- BUG_ON(!length);
- ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) {
ceph_msg_data_add_iter(msg, &osd_data->iter);
} else {
diff --git a/net/ceph/pagelist.c b/net/ceph/pagelist.c
deleted file mode 100644
index 74622b278d57..000000000000
--- a/net/ceph/pagelist.c
+++ /dev/null
@@ -1,171 +0,0 @@
-// SPDX-License-Identifier: GPL-2.0
-#include <linux/module.h>
-#include <linux/gfp.h>
-#include <linux/slab.h>
-#include <linux/pagemap.h>
-#include <linux/highmem.h>
-#include <linux/ceph/pagelist.h>
-
-struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags)
-{
- struct ceph_pagelist *pl;
-
- pl = kmalloc(sizeof(*pl), gfp_flags);
- if (!pl)
- return NULL;
-
- INIT_LIST_HEAD(&pl->head);
- pl->mapped_tail = NULL;
- pl->length = 0;
- pl->room = 0;
- INIT_LIST_HEAD(&pl->free_list);
- pl->num_pages_free = 0;
- refcount_set(&pl->refcnt, 1);
-
- return pl;
-}
-EXPORT_SYMBOL(ceph_pagelist_alloc);
-
-static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl)
-{
- if (pl->mapped_tail) {
- struct page *page = list_entry(pl->head.prev, struct page, lru);
- kunmap(page);
- pl->mapped_tail = NULL;
- }
-}
-
-void ceph_pagelist_release(struct ceph_pagelist *pl)
-{
- if (!refcount_dec_and_test(&pl->refcnt))
- return;
- ceph_pagelist_unmap_tail(pl);
- while (!list_empty(&pl->head)) {
- struct page *page = list_first_entry(&pl->head, struct page,
- lru);
- list_del(&page->lru);
- __free_page(page);
- }
- ceph_pagelist_free_reserve(pl);
- kfree(pl);
-}
-EXPORT_SYMBOL(ceph_pagelist_release);
-
-static int ceph_pagelist_addpage(struct ceph_pagelist *pl)
-{
- struct page *page;
-
- if (!pl->num_pages_free) {
- page = __page_cache_alloc(GFP_NOFS);
- } else {
- page = list_first_entry(&pl->free_list, struct page, lru);
- list_del(&page->lru);
- --pl->num_pages_free;
- }
- if (!page)
- return -ENOMEM;
- pl->room += PAGE_SIZE;
- ceph_pagelist_unmap_tail(pl);
- list_add_tail(&page->lru, &pl->head);
- pl->mapped_tail = kmap(page);
- return 0;
-}
-
-int ceph_pagelist_append(struct ceph_pagelist *pl, const void *buf, size_t len)
-{
- while (pl->room < len) {
- size_t bit = pl->room;
- int ret;
-
- memcpy(pl->mapped_tail + (pl->length & ~PAGE_MASK),
- buf, bit);
- pl->length += bit;
- pl->room -= bit;
- buf += bit;
- len -= bit;
- ret = ceph_pagelist_addpage(pl);
- if (ret)
- return ret;
- }
-
- memcpy(pl->mapped_tail + (pl->length & ~PAGE_MASK), buf, len);
- pl->length += len;
- pl->room -= len;
- return 0;
-}
-EXPORT_SYMBOL(ceph_pagelist_append);
-
-/* Allocate enough pages for a pagelist to append the given amount
- * of data without allocating.
- * Returns: 0 on success, -ENOMEM on error.
- */
-int ceph_pagelist_reserve(struct ceph_pagelist *pl, size_t space)
-{
- if (space <= pl->room)
- return 0;
- space -= pl->room;
- space = (space + PAGE_SIZE - 1) >> PAGE_SHIFT; /* conv to num pages */
-
- while (space > pl->num_pages_free) {
- struct page *page = __page_cache_alloc(GFP_NOFS);
- if (!page)
- return -ENOMEM;
- list_add_tail(&page->lru, &pl->free_list);
- ++pl->num_pages_free;
- }
- return 0;
-}
-EXPORT_SYMBOL(ceph_pagelist_reserve);
-
-/* Free any pages that have been preallocated. */
-int ceph_pagelist_free_reserve(struct ceph_pagelist *pl)
-{
- while (!list_empty(&pl->free_list)) {
- struct page *page = list_first_entry(&pl->free_list,
- struct page, lru);
- list_del(&page->lru);
- __free_page(page);
- --pl->num_pages_free;
- }
- BUG_ON(pl->num_pages_free);
- return 0;
-}
-EXPORT_SYMBOL(ceph_pagelist_free_reserve);
-
-/* Create a truncation point. */
-void ceph_pagelist_set_cursor(struct ceph_pagelist *pl,
- struct ceph_pagelist_cursor *c)
-{
- c->pl = pl;
- c->page_lru = pl->head.prev;
- c->room = pl->room;
-}
-EXPORT_SYMBOL(ceph_pagelist_set_cursor);
-
-/* Truncate a pagelist to the given point. Move extra pages to reserve.
- * This won't sleep.
- * Returns: 0 on success,
- * -EINVAL if the pagelist doesn't match the trunc point pagelist
- */
-int ceph_pagelist_truncate(struct ceph_pagelist *pl,
- struct ceph_pagelist_cursor *c)
-{
- struct page *page;
-
- if (pl != c->pl)
- return -EINVAL;
- ceph_pagelist_unmap_tail(pl);
- while (pl->head.prev != c->page_lru) {
- page = list_entry(pl->head.prev, struct page, lru);
- /* move from pagelist to reserve */
- list_move_tail(&page->lru, &pl->free_list);
- ++pl->num_pages_free;
- }
- pl->room = c->room;
- if (!list_empty(&pl->head)) {
- page = list_entry(pl->head.prev, struct page, lru);
- pl->mapped_tail = kmap(page);
- }
- return 0;
-}
-EXPORT_SYMBOL(ceph_pagelist_truncate);


2023-08-04 14:18:07

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 02/18] ceph: Rename alignment to offset

---
fs/ceph/addr.c | 4 ++--
include/linux/ceph/messenger.h | 4 ++--
include/linux/ceph/osd_client.h | 10 +++++-----
net/ceph/messenger.c | 10 +++++-----
net/ceph/osd_client.c | 24 ++++++++++++------------
5 files changed, 26 insertions(+), 26 deletions(-)

diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 228eab6706cd..7571606cf61f 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -279,7 +279,7 @@ static void finish_netfs_read(struct ceph_osd_request *req)

if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
ceph_put_page_vector(osd_data->pages,
- calc_pages_for(osd_data->alignment,
+ calc_pages_for(osd_data->offset,
osd_data->length), false);
}
netfs_subreq_terminated(subreq, err, false);
@@ -881,7 +881,7 @@ static void writepages_finish(struct ceph_osd_request *req)
osd_data = osd_req_op_extent_osd_data(req, i);
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
len += osd_data->length;
- num_pages = calc_pages_for((u64)osd_data->alignment,
+ num_pages = calc_pages_for((u64)osd_data->offset,
(u64)osd_data->length);
total_pages += num_pages;
for (j = 0; j < num_pages; j++) {
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 2eaaabbe98cb..f6f11bf9d63e 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -221,7 +221,7 @@ struct ceph_msg_data {
struct {
struct page **pages;
size_t length; /* total # bytes */
- unsigned int alignment; /* first page */
+ unsigned int offset; /* first page */
bool own_pages;
};
struct ceph_pagelist *pagelist;
@@ -602,7 +602,7 @@ extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
unsigned long interval);

void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
- size_t length, size_t alignment, bool own_pages);
+ size_t length, size_t offset, bool own_pages);
extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
struct ceph_pagelist *pagelist);
#ifdef CONFIG_BLOCK
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 41bcd71cfa7a..3dabebbdb5dc 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -117,7 +117,7 @@ struct ceph_osd_data {
struct {
struct page **pages;
u64 length;
- u32 alignment;
+ u32 offset;
bool pages_from_pool;
bool own_pages;
};
@@ -470,7 +470,7 @@ struct ceph_osd_req_op *osd_req_op_init(struct ceph_osd_request *osd_req,
extern void osd_req_op_raw_data_in_pages(struct ceph_osd_request *,
unsigned int which,
struct page **pages, u64 length,
- u32 alignment, bool pages_from_pool,
+ u32 offset, bool pages_from_pool,
bool own_pages);

extern void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
@@ -489,7 +489,7 @@ extern struct ceph_osd_data *osd_req_op_extent_osd_data(
extern void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *,
unsigned int which,
struct page **pages, u64 length,
- u32 alignment, bool pages_from_pool,
+ u32 offset, bool pages_from_pool,
bool own_pages);
extern void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *,
unsigned int which,
@@ -516,7 +516,7 @@ extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
extern void osd_req_op_cls_request_data_pages(struct ceph_osd_request *,
unsigned int which,
struct page **pages, u64 length,
- u32 alignment, bool pages_from_pool,
+ u32 offset, bool pages_from_pool,
bool own_pages);
void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
unsigned int which,
@@ -525,7 +525,7 @@ void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
unsigned int which,
struct page **pages, u64 length,
- u32 alignment, bool pages_from_pool,
+ u32 offset, bool pages_from_pool,
bool own_pages);
int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
const char *class, const char *method);
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9dce65fac0bd..6cfc6b69052f 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -840,8 +840,8 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
BUG_ON(!data->length);

cursor->resid = min(length, data->length);
- page_count = calc_pages_for(data->alignment, (u64)data->length);
- cursor->page_offset = data->alignment & ~PAGE_MASK;
+ page_count = calc_pages_for(data->offset, (u64)data->length);
+ cursor->page_offset = data->offset & ~PAGE_MASK;
cursor->page_index = 0;
BUG_ON(page_count > (int)USHRT_MAX);
cursor->page_count = (unsigned short)page_count;
@@ -1873,7 +1873,7 @@ static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
static void ceph_msg_data_destroy(struct ceph_msg_data *data)
{
if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
- int num_pages = calc_pages_for(data->alignment, data->length);
+ int num_pages = calc_pages_for(data->offset, data->length);
ceph_release_page_vector(data->pages, num_pages);
} else if (data->type == CEPH_MSG_DATA_PAGELIST) {
ceph_pagelist_release(data->pagelist);
@@ -1881,7 +1881,7 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
}

void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
- size_t length, size_t alignment, bool own_pages)
+ size_t length, size_t offset, bool own_pages)
{
struct ceph_msg_data *data;

@@ -1892,7 +1892,7 @@ void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
data->type = CEPH_MSG_DATA_PAGES;
data->pages = pages;
data->length = length;
- data->alignment = alignment & ~PAGE_MASK;
+ data->offset = offset & ~PAGE_MASK;
data->own_pages = own_pages;

msg->data_length += length;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 78b622178a3d..e3152e21418f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -130,13 +130,13 @@ static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
* Consumes @pages if @own_pages is true.
*/
static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
- struct page **pages, u64 length, u32 alignment,
+ struct page **pages, u64 length, u32 offset,
bool pages_from_pool, bool own_pages)
{
osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
osd_data->pages = pages;
osd_data->length = length;
- osd_data->alignment = alignment;
+ osd_data->offset = offset;
osd_data->pages_from_pool = pages_from_pool;
osd_data->own_pages = own_pages;
}
@@ -196,26 +196,26 @@ EXPORT_SYMBOL(osd_req_op_extent_osd_data);

void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
unsigned int which, struct page **pages,
- u64 length, u32 alignment,
+ u64 length, u32 offset,
bool pages_from_pool, bool own_pages)
{
struct ceph_osd_data *osd_data;

osd_data = osd_req_op_raw_data_in(osd_req, which);
- ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ ceph_osd_data_pages_init(osd_data, pages, length, offset,
pages_from_pool, own_pages);
}
EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);

void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
unsigned int which, struct page **pages,
- u64 length, u32 alignment,
+ u64 length, u32 offset,
bool pages_from_pool, bool own_pages)
{
struct ceph_osd_data *osd_data;

osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
- ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ ceph_osd_data_pages_init(osd_data, pages, length, offset,
pages_from_pool, own_pages);
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
@@ -312,12 +312,12 @@ EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);

void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
unsigned int which, struct page **pages, u64 length,
- u32 alignment, bool pages_from_pool, bool own_pages)
+ u32 offset, bool pages_from_pool, bool own_pages)
{
struct ceph_osd_data *osd_data;

osd_data = osd_req_op_data(osd_req, which, cls, request_data);
- ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ ceph_osd_data_pages_init(osd_data, pages, length, offset,
pages_from_pool, own_pages);
osd_req->r_ops[which].cls.indata_len += length;
osd_req->r_ops[which].indata_len += length;
@@ -344,12 +344,12 @@ EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);

void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
unsigned int which, struct page **pages, u64 length,
- u32 alignment, bool pages_from_pool, bool own_pages)
+ u32 offset, bool pages_from_pool, bool own_pages)
{
struct ceph_osd_data *osd_data;

osd_data = osd_req_op_data(osd_req, which, cls, response_data);
- ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ ceph_osd_data_pages_init(osd_data, pages, length, offset,
pages_from_pool, own_pages);
}
EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
@@ -382,7 +382,7 @@ static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
int num_pages;

- num_pages = calc_pages_for((u64)osd_data->alignment,
+ num_pages = calc_pages_for((u64)osd_data->offset,
(u64)osd_data->length);
ceph_release_page_vector(osd_data->pages, num_pages);
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
@@ -969,7 +969,7 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
BUG_ON(length > (u64) SIZE_MAX);
if (length)
ceph_msg_data_add_pages(msg, osd_data->pages,
- length, osd_data->alignment, false);
+ length, osd_data->offset, false);
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
BUG_ON(!length);
ceph_msg_data_add_pagelist(msg, osd_data->pagelist);


2023-08-04 14:39:30

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 03/18] ceph: Add a new data container type, ceph_databuf

Add a new ceph data container type, ceph_databuf, that carries a list of
pages in a bvec and use an iov_iter to handle the addition of data.

This is intended to replace all other types.

Signed-off-by: David Howells <[email protected]>
---
include/linux/ceph/databuf.h | 65 ++++++++++++++
include/linux/ceph/messenger.h | 6 +-
include/linux/ceph/osd_client.h | 9 ++
net/ceph/Makefile | 3 +-
net/ceph/databuf.c | 149 ++++++++++++++++++++++++++++++++
net/ceph/messenger.c | 22 ++++-
6 files changed, 251 insertions(+), 3 deletions(-)
create mode 100644 include/linux/ceph/databuf.h
create mode 100644 net/ceph/databuf.c

diff --git a/include/linux/ceph/databuf.h b/include/linux/ceph/databuf.h
new file mode 100644
index 000000000000..7146e3484250
--- /dev/null
+++ b/include/linux/ceph/databuf.h
@@ -0,0 +1,65 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef __FS_CEPH_DATABUF_H
+#define __FS_CEPH_DATABUF_H
+
+#include <asm/byteorder.h>
+#include <linux/refcount.h>
+#include <linux/blk_types.h>
+
+struct ceph_databuf {
+ struct bio_vec *bvec; /* List of pages */
+ struct iov_iter iter; /* Iterator holding append point */
+ size_t length; /* Amount of data stored */
+ size_t limit; /* Maximum length before expansion required */
+ size_t nr_bvec; /* Number of bvec[] that have pages */
+ size_t max_bvec; /* Size of bvec[] */
+ refcount_t refcnt;
+};
+
+struct ceph_databuf *ceph_databuf_alloc(size_t min_bvec, size_t space, gfp_t gfp);
+void ceph_databuf_release(struct ceph_databuf *dbuf);
+int ceph_databuf_append(struct ceph_databuf *dbuf, const void *d, size_t l);
+int ceph_databuf_reserve(struct ceph_databuf *dbuf, size_t space, gfp_t gfp);
+int ceph_databuf_insert_frag(struct ceph_databuf *dbuf, unsigned int ix,
+ size_t len, gfp_t gfp);
+
+static inline struct page *ceph_databuf_page(struct ceph_databuf *dbuf,
+ unsigned int ix)
+{
+ return dbuf->bvec[ix].bv_page;
+}
+
+#define kmap_ceph_databuf_page(dbuf, ix) \
+ kmap_local_page(ceph_databuf_page(dbuf, ix));
+
+static inline int ceph_databuf_encode_64(struct ceph_databuf *dbuf, u64 v)
+{
+ __le64 ev = cpu_to_le64(v);
+ return ceph_databuf_append(dbuf, &ev, sizeof(ev));
+}
+static inline int ceph_databuf_encode_32(struct ceph_databuf *dbuf, u32 v)
+{
+ __le32 ev = cpu_to_le32(v);
+ return ceph_databuf_append(dbuf, &ev, sizeof(ev));
+}
+static inline int ceph_databuf_encode_16(struct ceph_databuf *dbuf, u16 v)
+{
+ __le16 ev = cpu_to_le16(v);
+ return ceph_databuf_append(dbuf, &ev, sizeof(ev));
+}
+static inline int ceph_databuf_encode_8(struct ceph_databuf *dbuf, u8 v)
+{
+ return ceph_databuf_append(dbuf, &v, 1);
+}
+static inline int ceph_databuf_encode_string(struct ceph_databuf *dbuf,
+ const char *s, u32 len)
+{
+ int ret = ceph_databuf_encode_32(dbuf, len);
+ if (ret)
+ return ret;
+ if (len)
+ return ceph_databuf_append(dbuf, s, len);
+ return 0;
+}
+
+#endif /* __FS_CEPH_DATABUF_H */
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index f6f11bf9d63e..351d00e9632d 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -117,6 +117,7 @@ struct ceph_messenger {

enum ceph_msg_data_type {
CEPH_MSG_DATA_NONE, /* message contains no data payload */
+ CEPH_MSG_DATA_DATABUF, /* data source/destination is a data buffer */
CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */
CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */
#ifdef CONFIG_BLOCK
@@ -210,7 +211,10 @@ struct ceph_bvec_iter {

struct ceph_msg_data {
enum ceph_msg_data_type type;
+ struct iov_iter iter;
+ bool release_dbuf;
union {
+ struct ceph_databuf *dbuf;
#ifdef CONFIG_BLOCK
struct {
struct ceph_bio_iter bio_pos;
@@ -225,7 +229,6 @@ struct ceph_msg_data {
bool own_pages;
};
struct ceph_pagelist *pagelist;
- struct iov_iter iter;
};
};

@@ -601,6 +604,7 @@ extern void ceph_con_keepalive(struct ceph_connection *con);
extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
unsigned long interval);

+void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf);
void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t offset, bool own_pages);
extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 3dabebbdb5dc..2d8cd45f1c34 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -16,6 +16,7 @@
#include <linux/ceph/msgpool.h>
#include <linux/ceph/auth.h>
#include <linux/ceph/pagelist.h>
+#include <linux/ceph/databuf.h>

struct ceph_msg;
struct ceph_snap_context;
@@ -102,6 +103,7 @@ struct ceph_osd {

enum ceph_osd_data_type {
CEPH_OSD_DATA_TYPE_NONE = 0,
+ CEPH_OSD_DATA_TYPE_DATABUF,
CEPH_OSD_DATA_TYPE_PAGES,
CEPH_OSD_DATA_TYPE_PAGELIST,
#ifdef CONFIG_BLOCK
@@ -114,6 +116,7 @@ enum ceph_osd_data_type {
struct ceph_osd_data {
enum ceph_osd_data_type type;
union {
+ struct ceph_databuf *dbuf;
struct {
struct page **pages;
u64 length;
@@ -486,6 +489,9 @@ extern struct ceph_osd_data *osd_req_op_extent_osd_data(
struct ceph_osd_request *osd_req,
unsigned int which);

+extern void osd_req_op_extent_osd_databuf(struct ceph_osd_request *req,
+ unsigned int which,
+ struct ceph_databuf *dbuf);
extern void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *,
unsigned int which,
struct page **pages, u64 length,
@@ -510,6 +516,9 @@ void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req,
unsigned int which, struct iov_iter *iter);

+void osd_req_op_cls_request_databuf(struct ceph_osd_request *req,
+ unsigned int which,
+ struct ceph_databuf *dbuf);
extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
unsigned int which,
struct ceph_pagelist *pagelist);
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index 8802a0c0155d..4b2e0b654e45 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -15,4 +15,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
auth_x.o \
ceph_strings.o ceph_hash.o \
pagevec.o snapshot.o string_table.o \
- messenger_v1.o messenger_v2.o
+ messenger_v1.o messenger_v2.o \
+ databuf.o
diff --git a/net/ceph/databuf.c b/net/ceph/databuf.c
new file mode 100644
index 000000000000..cb070cedc5d9
--- /dev/null
+++ b/net/ceph/databuf.c
@@ -0,0 +1,149 @@
+// SPDX-License-Identifier: GPL-2.0
+/* Data container
+ *
+ * Copyright (C) 2023 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells ([email protected])
+ */
+
+#include <linux/export.h>
+#include <linux/gfp.h>
+#include <linux/slab.h>
+#include <linux/uio.h>
+#include <linux/pagemap.h>
+#include <linux/highmem.h>
+#include <linux/ceph/databuf.h>
+
+struct ceph_databuf *ceph_databuf_alloc(size_t min_bvec, size_t space, gfp_t gfp)
+{
+ struct ceph_databuf *dbuf;
+
+ dbuf = kzalloc(sizeof(*dbuf), gfp);
+ if (!dbuf)
+ return NULL;
+
+ min_bvec = max_t(size_t, min_bvec, 16);
+
+ dbuf->bvec = kcalloc(min_bvec, sizeof(struct bio_vec), gfp);
+ if (!dbuf->bvec) {
+ kfree(dbuf);
+ return NULL;
+ }
+
+ dbuf->max_bvec = min_bvec;
+ iov_iter_bvec(&dbuf->iter, ITER_DEST, dbuf->bvec, 0, 0);
+ refcount_set(&dbuf->refcnt, 1);
+
+ if (space) {
+ if (ceph_databuf_reserve(dbuf, space, gfp) < 0) {
+ ceph_databuf_release(dbuf);
+ return NULL;
+ }
+ }
+ return dbuf;
+}
+EXPORT_SYMBOL(ceph_databuf_alloc);
+
+void ceph_databuf_release(struct ceph_databuf *dbuf)
+{
+ size_t i;
+
+ if (!dbuf || !refcount_dec_and_test(&dbuf->refcnt))
+ return;
+
+ for (i = 0; i < dbuf->nr_bvec; i++)
+ __free_page(dbuf->bvec[i].bv_page);
+ kfree(dbuf);
+}
+EXPORT_SYMBOL(ceph_databuf_release);
+
+/*
+ * Expand the bvec[] in the dbuf.
+ */
+static int ceph_databuf_expand(struct ceph_databuf *dbuf, size_t req_bvec,
+ gfp_t gfp)
+{
+ struct bio_vec *bvec = dbuf->bvec, *old = bvec;
+ size_t size, max_bvec;
+
+ max_bvec = roundup_pow_of_two(req_bvec);
+ size = array_size(max_bvec, sizeof(struct bio_vec));
+
+ bvec = krealloc(old, size, gfp);
+ if (!bvec)
+ return -ENOMEM;
+ dbuf->bvec = bvec;
+ dbuf->max_bvec = max_bvec;
+ dbuf->iter.bvec = bvec + (dbuf->iter.bvec - old);
+ return 0;
+}
+
+/* Allocate enough pages for a dbuf to append the given amount
+ * of dbuf without allocating.
+ * Returns: 0 on success, -ENOMEM on error.
+ */
+int ceph_databuf_reserve(struct ceph_databuf *dbuf, size_t add_space,
+ gfp_t gfp)
+{
+ struct bio_vec *bvec = dbuf->bvec;
+ size_t i, req_bvec = DIV_ROUND_UP(dbuf->length + add_space, PAGE_SIZE);
+ int ret;
+
+ if (req_bvec > dbuf->max_bvec) {
+ ret = ceph_databuf_expand(dbuf, req_bvec, gfp);
+ if (ret < 0)
+ return ret;
+ }
+
+ while (dbuf->nr_bvec < req_bvec) {
+ struct page *pages[16];
+ size_t want = min(req_bvec, ARRAY_SIZE(pages)), got;
+
+ memset(&pages, 0, sizeof(pages));
+ got = alloc_pages_bulk_array(gfp, want, pages);
+ if (!got)
+ return -ENOMEM;
+ for (i = 0; i < got; i++)
+ bvec_set_page(&bvec[dbuf->nr_bvec + i], pages[i],
+ PAGE_SIZE, 0);
+ dbuf->iter.count += got * PAGE_SIZE;
+ dbuf->iter.nr_segs += got;
+ dbuf->nr_bvec += got;
+ dbuf->limit = dbuf->nr_bvec * PAGE_SIZE;
+ }
+
+ return 0;
+}
+EXPORT_SYMBOL(ceph_databuf_reserve);
+
+int ceph_databuf_append(struct ceph_databuf *dbuf, const void *buf, size_t len)
+{
+ if (dbuf->limit - dbuf->length > len &&
+ ceph_databuf_reserve(dbuf, len, GFP_NOIO) < 0)
+ return -ENOMEM;
+
+ if (copy_to_iter(buf, len, &dbuf->iter) != len)
+ return -EFAULT;
+ dbuf->length += len;
+ return 0;
+}
+EXPORT_SYMBOL(ceph_databuf_append);
+
+/*
+ * Allocate a fragment and insert it into the buffer at the specified index.
+ */
+int ceph_databuf_insert_frag(struct ceph_databuf *dbuf, unsigned int ix,
+ size_t len, gfp_t gfp)
+{
+ struct bio_vec *bv = &dbuf->bvec[ix];
+
+ bv->bv_page = alloc_page(gfp);
+ if (!bv->bv_page)
+ return -ENOMEM;
+ bv->bv_offset = 0;
+ bv->bv_len = len;
+
+ if (dbuf->nr_bvec == ix)
+ dbuf->nr_bvec = ix + 1;
+ return 0;
+}
+EXPORT_SYMBOL(ceph_databuf_insert_frag);
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 6cfc6b69052f..4c8899c26e1e 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1872,7 +1872,9 @@ static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)

static void ceph_msg_data_destroy(struct ceph_msg_data *data)
{
- if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
+ if (data->release_dbuf) {
+ ceph_databuf_release(data->dbuf);
+ } else if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
int num_pages = calc_pages_for(data->offset, data->length);
ceph_release_page_vector(data->pages, num_pages);
} else if (data->type == CEPH_MSG_DATA_PAGELIST) {
@@ -1880,6 +1882,24 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
}
}

+void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf)
+{
+ struct ceph_msg_data *data;
+
+ BUG_ON(!dbuf);
+ BUG_ON(!dbuf->length);
+
+ data = ceph_msg_data_add(msg);
+ data->type = CEPH_MSG_DATA_ITER;
+ data->dbuf = dbuf;
+ refcount_inc(&dbuf->refcnt);
+
+ iov_iter_bvec(&data->iter, ITER_SOURCE,
+ dbuf->bvec, dbuf->nr_bvec, dbuf->length);
+ msg->data_length += dbuf->length;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_databuf);
+
void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t offset, bool own_pages)
{


2023-08-04 14:39:51

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 12/18] ceph: Convert some page arrays to ceph_databuf

---
drivers/block/rbd.c | 12 +++---
include/linux/ceph/osd_client.h | 3 ++
net/ceph/osd_client.c | 74 +++++++++++++++++++++------------
3 files changed, 55 insertions(+), 34 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 1756973b696f..950b63eb41de 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -2108,7 +2108,7 @@ static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,

static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
{
- struct page **pages;
+ struct ceph_databuf *dbuf;

/*
* The response data for a STAT call consists of:
@@ -2118,14 +2118,12 @@ static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
* le32 tv_nsec;
* } mtime;
*/
- pages = ceph_alloc_page_vector(1, GFP_NOIO);
- if (IS_ERR(pages))
- return PTR_ERR(pages);
+ dbuf = ceph_databuf_alloc(1, 8 + sizeof(struct ceph_timespec), GFP_NOIO);
+ if (!dbuf)
+ return -ENOMEM;

osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
- osd_req_op_raw_data_in_pages(osd_req, which, pages,
- 8 + sizeof(struct ceph_timespec),
- 0, false, true);
+ osd_req_op_raw_data_in_databuf(osd_req, which, dbuf);
return 0;
}

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index fd91c5d92600..fec78550d5ce 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -483,6 +483,9 @@ extern void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *,
struct page **pages, u64 length,
u32 offset, bool pages_from_pool,
bool own_pages);
+void osd_req_op_raw_data_in_databuf(struct ceph_osd_request *osd_req,
+ unsigned int which,
+ struct ceph_databuf *databuf);
extern void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *,
unsigned int which,
struct ceph_pagelist *pagelist);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 6bbd9fe780c3..c83ae9bb335e 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -181,6 +181,17 @@ osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data);

+void osd_req_op_raw_data_in_databuf(struct ceph_osd_request *osd_req,
+ unsigned int which,
+ struct ceph_databuf *dbuf)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_raw_data_in(osd_req, which);
+ ceph_osd_databuf_init(osd_data, dbuf);
+}
+EXPORT_SYMBOL(osd_req_op_raw_data_in_databuf);
+
void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
unsigned int which, struct page **pages,
u64 length, u32 offset,
@@ -280,17 +291,16 @@ void osd_req_op_cls_request_data_pagelist(
}
EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);

-static void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
- unsigned int which, struct page **pages, u64 length,
- u32 offset, bool pages_from_pool, bool own_pages)
+static void osd_req_op_cls_request_data_iter(
+ struct ceph_osd_request *osd_req,
+ unsigned int which, struct iov_iter *iter)
{
struct ceph_osd_data *osd_data;

osd_data = osd_req_op_data(osd_req, which, cls, request_data);
- ceph_osd_data_pages_init(osd_data, pages, length, offset,
- pages_from_pool, own_pages);
- osd_req->r_ops[which].cls.indata_len += length;
- osd_req->r_ops[which].indata_len += length;
+ ceph_osd_iter_init(osd_data, iter);
+ osd_req->r_ops[which].cls.indata_len += iter->count;
+ osd_req->r_ops[which].indata_len += iter->count;
}

void osd_req_op_cls_response_databuf(struct ceph_osd_request *osd_req,
@@ -3017,10 +3027,12 @@ static void linger_commit_cb(struct ceph_osd_request *req)
if (!lreq->is_watch) {
struct ceph_osd_data *osd_data =
osd_req_op_data(req, 0, notify, response_data);
- void *p = page_address(osd_data->pages[0]);
+ void *p;

WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
- osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
+ osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
+
+ p = kmap_ceph_databuf_page(osd_data->dbuf, 0);

/* make note of the notify_id */
if (req->r_ops[0].outdata_len >= sizeof(u64)) {
@@ -3030,6 +3042,8 @@ static void linger_commit_cb(struct ceph_osd_request *req)
} else {
dout("lreq %p no notify_id\n", lreq);
}
+
+ kunmap_local(p);
}

out:
@@ -5032,7 +5046,7 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
u32 *num_watchers)
{
struct ceph_osd_request *req;
- struct page **pages;
+ struct ceph_databuf *dbuf;
int ret;

req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
@@ -5043,16 +5057,16 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = CEPH_OSD_FLAG_READ;

- pages = ceph_alloc_page_vector(1, GFP_NOIO);
- if (IS_ERR(pages)) {
- ret = PTR_ERR(pages);
+ dbuf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOIO);
+ if (!dbuf) {
+ ret = -ENOMEM;
goto out_put_req;
}

osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
- ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
- response_data),
- pages, PAGE_SIZE, 0, false, true);
+ ceph_osd_databuf_init(osd_req_op_data(req, 0, list_watchers,
+ response_data),
+ dbuf);

ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
if (ret)
@@ -5061,10 +5075,11 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
ceph_osdc_start_request(osdc, req);
ret = ceph_osdc_wait_request(osdc, req);
if (ret >= 0) {
- void *p = page_address(pages[0]);
+ void *p = kmap_ceph_databuf_page(dbuf, 0);
void *const end = p + req->r_ops[0].outdata_len;

ret = decode_watchers(&p, end, watchers, num_watchers);
+ kunmap(p);
}

out_put_req:
@@ -5111,6 +5126,8 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
struct ceph_databuf *response)
{
struct ceph_osd_request *req;
+ struct iov_iter iter;
+ struct bio_vec bv;
int ret;

if (req_len > PAGE_SIZE)
@@ -5128,9 +5145,11 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
if (ret)
goto out_put_req;

- if (req_page)
- osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
- 0, false, false);
+ if (req_page) {
+ bvec_set_page(&bv, req_page, 0, req_len);
+ iov_iter_bvec(&iter, ITER_SOURCE, &bv, 1, req_len);
+ osd_req_op_cls_request_data_iter(req, 0, &iter);
+ }
if (response)
osd_req_op_cls_response_databuf(req, 0, response);

@@ -5285,12 +5304,12 @@ int osd_req_op_copy_from_init(struct ceph_osd_request *req,
u8 copy_from_flags)
{
struct ceph_osd_req_op *op;
- struct page **pages;
+ struct ceph_databuf *dbuf;
void *p, *end;

- pages = ceph_alloc_page_vector(1, GFP_KERNEL);
- if (IS_ERR(pages))
- return PTR_ERR(pages);
+ dbuf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_KERNEL);
+ if (!dbuf)
+ return -ENOMEM;

op = osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2,
dst_fadvise_flags);
@@ -5299,16 +5318,17 @@ int osd_req_op_copy_from_init(struct ceph_osd_request *req,
op->copy_from.flags = copy_from_flags;
op->copy_from.src_fadvise_flags = src_fadvise_flags;

- p = page_address(pages[0]);
+ p = kmap_ceph_databuf_page(dbuf, 0);
end = p + PAGE_SIZE;
ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
encode_oloc(&p, end, src_oloc);
ceph_encode_32(&p, truncate_seq);
ceph_encode_64(&p, truncate_size);
op->indata_len = PAGE_SIZE - (end - p);
+ dbuf->length = op->indata_len;
+ kunmap_local(p);

- ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
- op->indata_len, 0, false, true);
+ ceph_osd_databuf_init(&op->copy_from.osd_data, dbuf);
return 0;
}
EXPORT_SYMBOL(osd_req_op_copy_from_init);


2023-08-04 14:49:00

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 06/18] ceph: Change ceph_osdc_call()'s reply to a ceph_databuf

Change the type of ceph_osdc_call()'s reply to a ceph_databuf struct rather
than a list of pages.

Signed-off-by: David Howells <[email protected]>
---
drivers/block/rbd.c | 134 ++++++++++++++++++--------------
include/linux/ceph/osd_client.h | 5 +-
net/ceph/cls_lock_client.c | 40 +++++-----
net/ceph/osd_client.c | 64 +++++++++++++--
4 files changed, 158 insertions(+), 85 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 2a161b03dd7a..971fa4a581cf 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1823,9 +1823,8 @@ static int __rbd_object_map_load(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
CEPH_DEFINE_OID_ONSTACK(oid);
- struct page **pages;
- void *p, *end;
- size_t reply_len;
+ struct ceph_databuf *reply;
+ void *p, *q, *end;
u64 num_objects;
u64 object_map_bytes;
u64 object_map_size;
@@ -1839,48 +1838,57 @@ static int __rbd_object_map_load(struct rbd_device *rbd_dev)
object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
BITS_PER_BYTE);
num_pages = calc_pages_for(0, object_map_bytes) + 1;
- pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
- if (IS_ERR(pages))
- return PTR_ERR(pages);

- reply_len = num_pages * PAGE_SIZE;
+ reply = ceph_databuf_alloc(num_pages, num_pages * PAGE_SIZE,
+ GFP_KERNEL);
+ if (!reply)
+ return -ENOMEM;
+
rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
"rbd", "object_map_load", CEPH_OSD_FLAG_READ,
- NULL, 0, pages, &reply_len);
+ NULL, 0, reply);
if (ret)
goto out;

- p = page_address(pages[0]);
- end = p + min(reply_len, (size_t)PAGE_SIZE);
- ret = decode_object_map_header(&p, end, &object_map_size);
+ p = kmap_ceph_databuf_page(reply, 0);
+ end = p + min(reply->iter.count, (size_t)PAGE_SIZE);
+ q = p;
+ ret = decode_object_map_header(&q, end, &object_map_size);
if (ret)
- goto out;
+ goto out_unmap;

if (object_map_size != num_objects) {
rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
object_map_size, num_objects);
ret = -EINVAL;
- goto out;
+ goto out_unmap;
}
+ iov_iter_advance(&reply->iter, q - p);

- if (offset_in_page(p) + object_map_bytes > reply_len) {
+ if (object_map_bytes > reply->iter.count) {
ret = -EINVAL;
- goto out;
+ goto out_unmap;
}

rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
if (!rbd_dev->object_map) {
ret = -ENOMEM;
- goto out;
+ goto out_unmap;
}

rbd_dev->object_map_size = object_map_size;
- ceph_copy_from_page_vector(pages, rbd_dev->object_map,
- offset_in_page(p), object_map_bytes);

+ ret = -EIO;
+ if (copy_from_iter(rbd_dev->object_map, object_map_bytes,
+ &reply->iter) != object_map_bytes)
+ goto out_unmap;
+
+ ret = 0;
+out_unmap:
+ kunmap_local(p);
out:
- ceph_release_page_vector(pages, num_pages);
+ ceph_databuf_release(reply);
return ret;
}

@@ -1949,6 +1957,7 @@ static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
{
struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
struct ceph_osd_data *osd_data;
+ struct ceph_databuf *dbuf;
u64 objno;
u8 state, new_state, current_state;
bool has_current_state;
@@ -1968,9 +1977,10 @@ static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
*/
rbd_assert(osd_req->r_num_ops == 2);
osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
- rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
+ rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF);
+ dbuf = osd_data->dbuf;

- p = page_address(osd_data->pages[0]);
+ p = kmap_ceph_databuf_page(dbuf, 0);
objno = ceph_decode_64(&p);
rbd_assert(objno == obj_req->ex.oe_objno);
rbd_assert(ceph_decode_64(&p) == objno + 1);
@@ -1978,6 +1988,7 @@ static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
has_current_state = ceph_decode_8(&p);
if (has_current_state)
current_state = ceph_decode_8(&p);
+ kunmap_local(p);

spin_lock(&rbd_dev->object_map_lock);
state = __rbd_object_map_get(rbd_dev, objno);
@@ -2017,7 +2028,7 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req,
int which, u64 objno, u8 new_state,
const u8 *current_state)
{
- struct page **pages;
+ struct ceph_databuf *dbuf;
void *p, *start;
int ret;

@@ -2025,11 +2036,11 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req,
if (ret)
return ret;

- pages = ceph_alloc_page_vector(1, GFP_NOIO);
- if (IS_ERR(pages))
- return PTR_ERR(pages);
+ dbuf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOIO);
+ if (!dbuf)
+ return -ENOMEM;

- p = start = page_address(pages[0]);
+ p = start = kmap_ceph_databuf_page(dbuf, 0);
ceph_encode_64(&p, objno);
ceph_encode_64(&p, objno + 1);
ceph_encode_8(&p, new_state);
@@ -2039,9 +2050,11 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req,
} else {
ceph_encode_8(&p, 0);
}
+ kunmap_local(p);
+ dbuf->length = p - start;

- osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
- false, true);
+ osd_req_op_cls_request_databuf(req, which, dbuf);
+ ceph_databuf_release(dbuf);
return 0;
}

@@ -4613,8 +4626,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
size_t inbound_size)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+ struct ceph_databuf *reply;
struct page *req_page = NULL;
- struct page *reply_page;
int ret;

/*
@@ -4635,8 +4648,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
memcpy(page_address(req_page), outbound, outbound_size);
}

- reply_page = alloc_page(GFP_KERNEL);
- if (!reply_page) {
+ reply = ceph_databuf_alloc(1, inbound_size, GFP_KERNEL);
+ if (!reply) {
if (req_page)
__free_page(req_page);
return -ENOMEM;
@@ -4644,15 +4657,16 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,

ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
CEPH_OSD_FLAG_READ, req_page, outbound_size,
- &reply_page, &inbound_size);
+ reply);
if (!ret) {
- memcpy(inbound, page_address(reply_page), inbound_size);
- ret = inbound_size;
+ ret = reply->length;
+ if (copy_from_iter(inbound, reply->length, &reply->iter) != ret)
+ ret = -EIO;
}

if (req_page)
__free_page(req_page);
- __free_page(reply_page);
+ ceph_databuf_release(reply);
return ret;
}

@@ -5615,7 +5629,7 @@ static int decode_parent_image_spec(void **p, void *end,

static int __get_parent_info(struct rbd_device *rbd_dev,
struct page *req_page,
- struct page *reply_page,
+ struct ceph_databuf *reply,
struct parent_image_info *pii)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
@@ -5625,27 +5639,29 @@ static int __get_parent_info(struct rbd_device *rbd_dev,

ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
"rbd", "parent_get", CEPH_OSD_FLAG_READ,
- req_page, sizeof(u64), &reply_page, &reply_len);
+ req_page, sizeof(u64), reply);
if (ret)
return ret == -EOPNOTSUPP ? 1 : ret;

- p = page_address(reply_page);
+ p = kmap_ceph_databuf_page(reply, 0);
end = p + reply_len;
ret = decode_parent_image_spec(&p, end, pii);
+ kunmap_local(p);
if (ret)
return ret;

ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
"rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
- req_page, sizeof(u64), &reply_page, &reply_len);
+ req_page, sizeof(u64), reply);
if (ret)
return ret;

- p = page_address(reply_page);
+ p = kmap_ceph_databuf_page(reply, 0);
end = p + reply_len;
ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
if (pii->has_overlap)
ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
+ kunmap_local(p);

return 0;

@@ -5658,25 +5674,25 @@ static int __get_parent_info(struct rbd_device *rbd_dev,
*/
static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
struct page *req_page,
- struct page *reply_page,
+ struct ceph_databuf *reply,
struct parent_image_info *pii)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
- size_t reply_len = PAGE_SIZE;
void *p, *end;
int ret;

ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
"rbd", "get_parent", CEPH_OSD_FLAG_READ,
- req_page, sizeof(u64), &reply_page, &reply_len);
+ req_page, sizeof(u64), reply);
if (ret)
return ret;

- p = page_address(reply_page);
- end = p + reply_len;
+ p = kmap_ceph_databuf_page(reply, 0);
+ end = p + reply->length;
ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
if (IS_ERR(pii->image_id)) {
+ kunmap_local(p);
ret = PTR_ERR(pii->image_id);
pii->image_id = NULL;
return ret;
@@ -5684,6 +5700,7 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
pii->has_overlap = true;
ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
+ kunmap_local(p);

return 0;

@@ -5694,29 +5711,30 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
static int get_parent_info(struct rbd_device *rbd_dev,
struct parent_image_info *pii)
{
- struct page *req_page, *reply_page;
+ struct ceph_databuf *reply;
+ struct page *req_page;
void *p;
- int ret;
+ int ret = -ENOMEM;

req_page = alloc_page(GFP_KERNEL);
if (!req_page)
- return -ENOMEM;
+ goto out;

- reply_page = alloc_page(GFP_KERNEL);
- if (!reply_page) {
- __free_page(req_page);
- return -ENOMEM;
- }
+ reply = ceph_databuf_alloc(1, PAGE_SIZE, GFP_KERNEL);
+ if (!reply)
+ goto out_free;

- p = page_address(req_page);
+ p = kmap_local_page(req_page);
ceph_encode_64(&p, rbd_dev->spec->snap_id);
- ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
+ kunmap_local(p);
+ ret = __get_parent_info(rbd_dev, req_page, reply, pii);
if (ret > 0)
- ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
- pii);
+ ret = __get_parent_info_legacy(rbd_dev, req_page, reply, pii);

+ ceph_databuf_release(reply);
+out_free:
__free_page(req_page);
- __free_page(reply_page);
+out:
return ret;
}

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 2d8cd45f1c34..0e008837dac1 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -531,6 +531,9 @@ void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
unsigned int which,
struct bio_vec *bvecs, u32 num_bvecs,
u32 bytes);
+void osd_req_op_cls_response_databuf(struct ceph_osd_request *osd_req,
+ unsigned int which,
+ struct ceph_databuf *dbuf);
extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
unsigned int which,
struct page **pages, u64 length,
@@ -605,7 +608,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
const char *class, const char *method,
unsigned int flags,
struct page *req_page, size_t req_len,
- struct page **resp_pages, size_t *resp_len);
+ struct ceph_databuf *response);

/* watch/notify */
struct ceph_osd_linger_request *
diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
index 66136a4c1ce7..e2f508704c29 100644
--- a/net/ceph/cls_lock_client.c
+++ b/net/ceph/cls_lock_client.c
@@ -74,7 +74,7 @@ int ceph_cls_lock(struct ceph_osd_client *osdc,
__func__, lock_name, type, cookie, tag, desc, flags);
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock",
CEPH_OSD_FLAG_WRITE, lock_op_page,
- lock_op_buf_size, NULL, NULL);
+ lock_op_buf_size, NULL);

dout("%s: status %d\n", __func__, ret);
__free_page(lock_op_page);
@@ -124,7 +124,7 @@ int ceph_cls_unlock(struct ceph_osd_client *osdc,
dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie);
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock",
CEPH_OSD_FLAG_WRITE, unlock_op_page,
- unlock_op_buf_size, NULL, NULL);
+ unlock_op_buf_size, NULL);

dout("%s: status %d\n", __func__, ret);
__free_page(unlock_op_page);
@@ -179,7 +179,7 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
cookie, ENTITY_NAME(*locker));
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock",
CEPH_OSD_FLAG_WRITE, break_op_page,
- break_op_buf_size, NULL, NULL);
+ break_op_buf_size, NULL);

dout("%s: status %d\n", __func__, ret);
__free_page(break_op_page);
@@ -230,7 +230,7 @@ int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
__func__, lock_name, type, old_cookie, tag, new_cookie);
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "set_cookie",
CEPH_OSD_FLAG_WRITE, cookie_op_page,
- cookie_op_buf_size, NULL, NULL);
+ cookie_op_buf_size, NULL);

dout("%s: status %d\n", __func__, ret);
__free_page(cookie_op_page);
@@ -337,10 +337,10 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
char *lock_name, u8 *type, char **tag,
struct ceph_locker **lockers, u32 *num_lockers)
{
+ struct ceph_databuf *reply;
int get_info_op_buf_size;
int name_len = strlen(lock_name);
- struct page *get_info_op_page, *reply_page;
- size_t reply_len = PAGE_SIZE;
+ struct page *get_info_op_page;
void *p, *end;
int ret;

@@ -353,8 +353,8 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
if (!get_info_op_page)
return -ENOMEM;

- reply_page = alloc_page(GFP_NOIO);
- if (!reply_page) {
+ reply = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOIO);
+ if (!reply) {
__free_page(get_info_op_page);
return -ENOMEM;
}
@@ -370,18 +370,19 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
dout("%s lock_name %s\n", __func__, lock_name);
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info",
CEPH_OSD_FLAG_READ, get_info_op_page,
- get_info_op_buf_size, &reply_page, &reply_len);
+ get_info_op_buf_size, reply);

dout("%s: status %d\n", __func__, ret);
if (ret >= 0) {
- p = page_address(reply_page);
- end = p + reply_len;
+ p = kmap_ceph_databuf_page(reply, 0);
+ end = p + reply->length;

ret = decode_lockers(&p, end, type, tag, lockers, num_lockers);
+ kunmap_local(p);
}

__free_page(get_info_op_page);
- __free_page(reply_page);
+ ceph_databuf_release(reply);
return ret;
}
EXPORT_SYMBOL(ceph_cls_lock_info);
@@ -389,11 +390,11 @@ EXPORT_SYMBOL(ceph_cls_lock_info);
int ceph_cls_assert_locked(struct ceph_osd_request *req, int which,
char *lock_name, u8 type, char *cookie, char *tag)
{
+ struct ceph_databuf *dbuf;
int assert_op_buf_size;
int name_len = strlen(lock_name);
int cookie_len = strlen(cookie);
int tag_len = strlen(tag);
- struct page **pages;
void *p, *end;
int ret;

@@ -408,11 +409,11 @@ int ceph_cls_assert_locked(struct ceph_osd_request *req, int which,
if (ret)
return ret;

- pages = ceph_alloc_page_vector(1, GFP_NOIO);
- if (IS_ERR(pages))
- return PTR_ERR(pages);
+ dbuf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOIO);
+ if (!dbuf)
+ return -ENOMEM;

- p = page_address(pages[0]);
+ p = kmap_ceph_databuf_page(dbuf, 0);
end = p + assert_op_buf_size;

/* encode cls_lock_assert_op struct */
@@ -422,10 +423,11 @@ int ceph_cls_assert_locked(struct ceph_osd_request *req, int which,
ceph_encode_8(&p, type);
ceph_encode_string(&p, end, cookie, cookie_len);
ceph_encode_string(&p, end, tag, tag_len);
+ kunmap(p);
WARN_ON(p != end);
+ dbuf->length = assert_op_buf_size;

- osd_req_op_cls_request_data_pages(req, which, pages, assert_op_buf_size,
- 0, false, true);
+ osd_req_op_cls_request_databuf(req, which, dbuf);
return 0;
}
EXPORT_SYMBOL(ceph_cls_assert_locked);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index e3152e21418f..7ce3aef55755 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -178,6 +178,16 @@ static void ceph_osd_iter_init(struct ceph_osd_data *osd_data,
osd_data->iter = *iter;
}

+/*
+ * Consumes a ref on @dbuf.
+ */
+static void ceph_osd_databuf_init(struct ceph_osd_data *osd_data,
+ struct ceph_databuf *dbuf)
+{
+ ceph_osd_iter_init(osd_data, &dbuf->iter);
+ osd_data->dbuf = dbuf;
+}
+
static struct ceph_osd_data *
osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
{
@@ -207,6 +217,17 @@ void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);

+void osd_req_op_extent_osd_databuf(struct ceph_osd_request *osd_req,
+ unsigned int which,
+ struct ceph_databuf *dbuf)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_databuf_init(osd_data, dbuf);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_databuf);
+
void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
unsigned int which, struct page **pages,
u64 length, u32 offset,
@@ -297,6 +318,19 @@ static void osd_req_op_cls_request_info_pagelist(
ceph_osd_data_pagelist_init(osd_data, pagelist);
}

+void osd_req_op_cls_request_databuf(struct ceph_osd_request *osd_req,
+ unsigned int which,
+ struct ceph_databuf *dbuf)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+ ceph_osd_databuf_init(osd_data, dbuf);
+ osd_req->r_ops[which].cls.indata_len += dbuf->length;
+ osd_req->r_ops[which].indata_len += dbuf->length;
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_databuf);
+
void osd_req_op_cls_request_data_pagelist(
struct ceph_osd_request *osd_req,
unsigned int which, struct ceph_pagelist *pagelist)
@@ -342,6 +376,17 @@ void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);

+void osd_req_op_cls_response_databuf(struct ceph_osd_request *osd_req,
+ unsigned int which,
+ struct ceph_databuf *dbuf)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, response_data);
+ ceph_osd_databuf_init(osd_data, dbuf);
+}
+EXPORT_SYMBOL(osd_req_op_cls_response_databuf);
+
void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
unsigned int which, struct page **pages, u64 length,
u32 offset, bool pages_from_pool, bool own_pages)
@@ -5162,7 +5207,11 @@ EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
* Execute an OSD class method on an object.
*
* @flags: CEPH_OSD_FLAG_*
- * @resp_len: in/out param for reply length
+ * @response: Pointer to the storage descriptor for the reply or NULL.
+ *
+ * The size of the response buffer is set by the caller in @response->limit and
+ * the size of the response obtained is set in @response->length and
+ * @response->iter.count.
*/
int ceph_osdc_call(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
@@ -5170,7 +5219,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
const char *class, const char *method,
unsigned int flags,
struct page *req_page, size_t req_len,
- struct page **resp_pages, size_t *resp_len)
+ struct ceph_databuf *response)
{
struct ceph_osd_request *req;
int ret;
@@ -5193,9 +5242,8 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
if (req_page)
osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
0, false, false);
- if (resp_pages)
- osd_req_op_cls_response_data_pages(req, 0, resp_pages,
- *resp_len, 0, false, false);
+ if (response)
+ osd_req_op_cls_response_databuf(req, 0, response);

ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
if (ret)
@@ -5205,8 +5253,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
ret = ceph_osdc_wait_request(osdc, req);
if (ret >= 0) {
ret = req->r_ops[0].rval;
- if (resp_pages)
- *resp_len = req->r_ops[0].outdata_len;
+ if (response) {
+ response->length = req->r_ops[0].outdata_len;
+ response->iter.count = response->length;
+ }
}

out_put_req:


2023-08-04 15:07:17

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 15/18] ceph: Convert ceph_osdc_notify() reply to ceph_databuf

Convert the reply buffer of ceph_osdc_notify() to ceph_databuf rather than
an array of pages.

Signed-off-by: David Howells <[email protected]>
---
drivers/block/rbd.c | 33 ++++++++++++++++++++-------------
include/linux/ceph/osd_client.h | 7 ++-----
net/ceph/osd_client.c | 17 ++++-------------
3 files changed, 26 insertions(+), 31 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 950b63eb41de..7a624e75ac7a 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3455,8 +3455,7 @@ static void rbd_unlock(struct rbd_device *rbd_dev)

static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
enum rbd_notify_op notify_op,
- struct page ***preply_pages,
- size_t *preply_len)
+ struct ceph_databuf *reply)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_client_id cid = rbd_get_cid(rbd_dev);
@@ -3474,13 +3473,13 @@ static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,

return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, buf, buf_size,
- RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
+ RBD_NOTIFY_TIMEOUT, reply);
}

static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
enum rbd_notify_op notify_op)
{
- __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
+ __rbd_notify_op_lock(rbd_dev, notify_op, NULL);
}

static void rbd_notify_acquired_lock(struct work_struct *work)
@@ -3501,23 +3500,26 @@ static void rbd_notify_released_lock(struct work_struct *work)

static int rbd_request_lock(struct rbd_device *rbd_dev)
{
- struct page **reply_pages;
- size_t reply_len;
+ struct ceph_databuf *reply;
bool lock_owner_responded = false;
int ret;

dout("%s rbd_dev %p\n", __func__, rbd_dev);

- ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
- &reply_pages, &reply_len);
+ reply = ceph_databuf_alloc(0, 0, GFP_KERNEL);
+ if (!reply)
+ return -ENOMEM;
+
+ ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK, reply);
if (ret && ret != -ETIMEDOUT) {
rbd_warn(rbd_dev, "failed to request lock: %d", ret);
goto out;
}

- if (reply_len > 0 && reply_len <= PAGE_SIZE) {
- void *p = page_address(reply_pages[0]);
- void *const end = p + reply_len;
+ if (reply->length > 0 && reply->length <= PAGE_SIZE) {
+ void *s = kmap_ceph_databuf_page(reply, 0);
+ void *p = s;
+ void *const end = p + reply->length;
u32 n;

ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
@@ -3529,10 +3531,12 @@ static int rbd_request_lock(struct rbd_device *rbd_dev)
p += 8 + 8; /* skip gid and cookie */

ceph_decode_32_safe(&p, end, len, e_inval);
- if (!len)
+ if (!len) {
continue;
+ }

if (lock_owner_responded) {
+ kunmap_local(s);
rbd_warn(rbd_dev,
"duplicate lock owners detected");
ret = -EIO;
@@ -3543,6 +3547,7 @@ static int rbd_request_lock(struct rbd_device *rbd_dev)
ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
&struct_v, &len);
if (ret) {
+ kunmap_local(s);
rbd_warn(rbd_dev,
"failed to decode ResponseMessage: %d",
ret);
@@ -3551,6 +3556,8 @@ static int rbd_request_lock(struct rbd_device *rbd_dev)

ret = ceph_decode_32(&p);
}
+
+ kunmap_local(s);
}

if (!lock_owner_responded) {
@@ -3559,7 +3566,7 @@ static int rbd_request_lock(struct rbd_device *rbd_dev)
}

out:
- ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
+ ceph_databuf_release(reply);
return ret;

e_inval:
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 83c3073c44bb..3099f923c241 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -332,9 +332,7 @@ struct ceph_osd_linger_request {

struct ceph_databuf *request_pl;
struct ceph_databuf *notify_id_buf;
-
- struct page ***preply_pages;
- size_t *preply_len;
+ struct ceph_databuf *reply;
};

struct ceph_watch_item {
@@ -587,8 +585,7 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
void *payload,
u32 payload_len,
u32 timeout,
- struct page ***preply_pages,
- size_t *preply_len);
+ struct ceph_databuf *reply);
int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
struct ceph_osd_linger_request *lreq);
int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 8cbe06d2e16d..0fe16fdc760f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -4530,7 +4530,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
msg->num_data_items ? &msg->data[0] : NULL;

if (data) {
- if (lreq->preply_pages) {
+ if (lreq->reply) {
WARN_ON(data->type !=
CEPH_MSG_DATA_PAGES);
*lreq->preply_pages = data->pages;
@@ -4828,10 +4828,7 @@ EXPORT_SYMBOL(ceph_osdc_notify_ack);
/*
* @timeout: in seconds
*
- * @preply_{pages,len} are initialized both on success and error.
- * The caller is responsible for:
- *
- * ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
+ * @reply should be an empty ceph_databuf.
*/
int ceph_osdc_notify(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
@@ -4839,17 +4836,12 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
void *payload,
u32 payload_len,
u32 timeout,
- struct page ***preply_pages,
- size_t *preply_len)
+ struct ceph_databuf *reply)
{
struct ceph_osd_linger_request *lreq;
int ret;

WARN_ON(!timeout);
- if (preply_pages) {
- *preply_pages = NULL;
- *preply_len = 0;
- }

lreq = linger_alloc(osdc);
if (!lreq)
@@ -4877,8 +4869,7 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
goto out_put_lreq;
}

- lreq->preply_pages = preply_pages;
- lreq->preply_len = preply_len;
+ lreq->reply = reply;

ceph_oid_copy(&lreq->t.base_oid, oid);
ceph_oloc_copy(&lreq->t.base_oloc, oloc);


2023-08-04 15:07:26

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 01/18] iov_iter: Add function to see if buffer is all zeros

Add a function to scan a buffer and indicate if all of the bytes contained
therein are zero.

Signed-off-by: David Howells <[email protected]>
---
include/linux/uio.h | 1 +
lib/iov_iter.c | 22 ++++++++++++++++++++++
2 files changed, 23 insertions(+)

diff --git a/include/linux/uio.h b/include/linux/uio.h
index ff81e5ccaef2..49de7b8a8890 100644
--- a/include/linux/uio.h
+++ b/include/linux/uio.h
@@ -264,6 +264,7 @@ static inline bool iov_iter_is_copy_mc(const struct iov_iter *i)
#endif

size_t iov_iter_zero(size_t bytes, struct iov_iter *);
+bool iov_iter_is_zero(const struct iov_iter *i, size_t count);
bool iov_iter_is_aligned(const struct iov_iter *i, unsigned addr_mask,
unsigned len_mask);
unsigned long iov_iter_alignment(const struct iov_iter *i);
diff --git a/lib/iov_iter.c b/lib/iov_iter.c
index b667b1e2f688..ec9e3e1a11a9 100644
--- a/lib/iov_iter.c
+++ b/lib/iov_iter.c
@@ -566,6 +566,28 @@ size_t iov_iter_zero(size_t bytes, struct iov_iter *i)
}
EXPORT_SYMBOL(iov_iter_zero);

+/**
+ * iov_iter_is_zero - Return true if the buffer is entirely zeroed
+ * @i: The iterator describing the buffer
+ * @count: Amount of buffer to scan
+ *
+ * Scans the specified amount of the supplied buffer and returns true if only
+ * zero bytes are found therein and false otherwise.
+ */
+bool iov_iter_is_zero(const struct iov_iter *i, size_t count)
+{
+ struct iov_iter j = *i, *pj = &j;
+ void *p;
+
+ iterate_and_advance(pj, count, base, len, count,
+ ({ p = memchr_inv(base, 0, len); p ? p - base : len; }),
+ ({ p = memchr_inv(base, 0, len); p ? p - base : len; })
+ )
+
+ return !count;
+}
+EXPORT_SYMBOL(iov_iter_is_zero);
+
size_t copy_page_from_iter_atomic(struct page *page, unsigned offset, size_t bytes,
struct iov_iter *i)
{


2023-08-04 15:25:40

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 08/18] ceph: Remove osd_req_op_cls_response_data_pages()

Remove osd_req_op_cls_response_data_pages() as it's no longer used.

Signed-off-by: David Howells <[email protected]>
---
include/linux/ceph/osd_client.h | 5 -----
net/ceph/osd_client.c | 12 ------------
2 files changed, 17 deletions(-)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index e1533f3314ad..0b02e272acc2 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -529,11 +529,6 @@ void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
void osd_req_op_cls_response_databuf(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_databuf *dbuf);
-extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
- unsigned int which,
- struct page **pages, u64 length,
- u32 offset, bool pages_from_pool,
- bool own_pages);
int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
const char *class, const char *method);
extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 2ba6f2ce5fb6..aa9d07221149 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -386,18 +386,6 @@ void osd_req_op_cls_response_databuf(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_cls_response_databuf);

-void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
- unsigned int which, struct page **pages, u64 length,
- u32 offset, bool pages_from_pool, bool own_pages)
-{
- struct ceph_osd_data *osd_data;
-
- osd_data = osd_req_op_data(osd_req, which, cls, response_data);
- ceph_osd_data_pages_init(osd_data, pages, length, offset,
- pages_from_pool, own_pages);
-}
-EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
-
static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
{
switch (osd_data->type) {


2023-08-04 16:40:21

by David Howells

[permalink] [raw]
Subject: [RFC PATCH 17/18] ceph: Remove CEPH_MSG_DATA_PAGES and its helpers

---
include/linux/ceph/messenger.h | 26 ++-------
net/ceph/messenger.c | 98 +---------------------------------
net/ceph/osd_client.c | 2 -
3 files changed, 5 insertions(+), 121 deletions(-)

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index a2489e266bff..f48657eef648 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -118,23 +118,14 @@ struct ceph_messenger {
enum ceph_msg_data_type {
CEPH_MSG_DATA_NONE, /* message contains no data payload */
CEPH_MSG_DATA_DATABUF, /* data source/destination is a data buffer */
- CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */
CEPH_MSG_DATA_ITER, /* data source/destination is an iov_iter */
};

struct ceph_msg_data {
enum ceph_msg_data_type type;
- struct iov_iter iter;
bool release_dbuf;
- union {
- struct ceph_databuf *dbuf;
- struct {
- struct page **pages;
- size_t length; /* total # bytes */
- unsigned int offset; /* first page */
- bool own_pages;
- };
- };
+ struct iov_iter iter;
+ struct ceph_databuf *dbuf;
};

struct ceph_msg_data_cursor {
@@ -144,17 +135,8 @@ struct ceph_msg_data_cursor {
size_t resid; /* bytes not yet consumed */
int sr_resid; /* residual sparse_read len */
bool need_crc; /* crc update needed */
- union {
- struct { /* pages */
- unsigned int page_offset; /* offset in page */
- unsigned short page_index; /* index in array */
- unsigned short page_count; /* pages in array */
- };
- struct {
- struct iov_iter iov_iter;
- unsigned int lastlen;
- };
- };
+ struct iov_iter iov_iter;
+ unsigned int lastlen;
};

/*
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 5b28c27858b2..acbdd086cd7a 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -710,70 +710,6 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
}
}

-/*
- * For a page array, a piece comes from the first page in the array
- * that has not already been fully consumed.
- */
-static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
- size_t length)
-{
- struct ceph_msg_data *data = cursor->data;
- int page_count;
-
- BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
-
- BUG_ON(!data->pages);
- BUG_ON(!data->length);
-
- cursor->resid = min(length, data->length);
- page_count = calc_pages_for(data->offset, (u64)data->length);
- cursor->page_offset = data->offset & ~PAGE_MASK;
- cursor->page_index = 0;
- BUG_ON(page_count > (int)USHRT_MAX);
- cursor->page_count = (unsigned short)page_count;
- BUG_ON(length > SIZE_MAX - cursor->page_offset);
-}
-
-static struct page *
-ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
- size_t *page_offset, size_t *length)
-{
- struct ceph_msg_data *data = cursor->data;
-
- BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
-
- BUG_ON(cursor->page_index >= cursor->page_count);
- BUG_ON(cursor->page_offset >= PAGE_SIZE);
-
- *page_offset = cursor->page_offset;
- *length = min_t(size_t, cursor->resid, PAGE_SIZE - *page_offset);
- return data->pages[cursor->page_index];
-}
-
-static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
- size_t bytes)
-{
- BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
-
- BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
-
- /* Advance the cursor page offset */
-
- cursor->resid -= bytes;
- cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
- if (!bytes || cursor->page_offset)
- return false; /* more bytes to process in the current page */
-
- if (!cursor->resid)
- return false; /* no more data */
-
- /* Move on to the next page; offset is already at 0 */
-
- BUG_ON(cursor->page_index >= cursor->page_count);
- cursor->page_index++;
- return true;
-}
-
static void ceph_msg_data_iter_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length)
{
@@ -844,9 +780,6 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
size_t length = cursor->total_resid;

switch (cursor->data->type) {
- case CEPH_MSG_DATA_PAGES:
- ceph_msg_data_pages_cursor_init(cursor, length);
- break;
case CEPH_MSG_DATA_ITER:
ceph_msg_data_iter_cursor_init(cursor, length);
break;
@@ -883,9 +816,6 @@ struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
struct page *page;

switch (cursor->data->type) {
- case CEPH_MSG_DATA_PAGES:
- page = ceph_msg_data_pages_next(cursor, page_offset, length);
- break;
case CEPH_MSG_DATA_ITER:
page = ceph_msg_data_iter_next(cursor, page_offset, length);
break;
@@ -913,9 +843,6 @@ void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)

BUG_ON(bytes > cursor->resid);
switch (cursor->data->type) {
- case CEPH_MSG_DATA_PAGES:
- new_piece = ceph_msg_data_pages_advance(cursor, bytes);
- break;
case CEPH_MSG_DATA_ITER:
new_piece = ceph_msg_data_iter_advance(cursor, bytes);
break;
@@ -1644,12 +1571,8 @@ static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)

static void ceph_msg_data_destroy(struct ceph_msg_data *data)
{
- if (data->release_dbuf) {
+ if (data->release_dbuf)
ceph_databuf_release(data->dbuf);
- } else if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
- int num_pages = calc_pages_for(data->offset, data->length);
- ceph_release_page_vector(data->pages, num_pages);
- }
}

void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf)
@@ -1670,25 +1593,6 @@ void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf)
}
EXPORT_SYMBOL(ceph_msg_data_add_databuf);

-void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
- size_t length, size_t offset, bool own_pages)
-{
- struct ceph_msg_data *data;
-
- BUG_ON(!pages);
- BUG_ON(!length);
-
- data = ceph_msg_data_add(msg);
- data->type = CEPH_MSG_DATA_PAGES;
- data->pages = pages;
- data->length = length;
- data->offset = offset & ~PAGE_MASK;
- data->own_pages = own_pages;
-
- msg->data_length += length;
-}
-EXPORT_SYMBOL(ceph_msg_data_add_pages);
-
void ceph_msg_data_add_iter(struct ceph_msg *msg,
struct iov_iter *iter)
{
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 70f81a0b62c0..6fb78ae14f03 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -829,8 +829,6 @@ EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
struct ceph_osd_data *osd_data)
{
- u64 length = ceph_osd_data_length(osd_data);
-
if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) {
ceph_msg_data_add_iter(msg, &osd_data->iter);
} else {