2010-04-13 23:53:27

by Yehuda Sadeh

[permalink] [raw]
Subject: [PATCH 0/6] Ceph RADOS block device

The following series implements a linux rados block device. It allows striping of data across rados, the ceph distributed block layer, and is binary compatible with Christian Brunner's kvm-rbd implementation. Similarly to the Sheepdog project, it stripes the block device across 4MB (or other configurable size) objects stored by the distributed object store, and enjoys all the rados features (high availability, scalability, etc.). Future versions will be able to use the rados snapshots mechanism.

A use case for this device would be to have some kind of a local file system created on it, and use it with conjuction of kvm to do migration and other related stuff. Another option would be to use it as data devices for other distributed file systems (e.g., ocfs2, gfs2).

The actual device driver is implemented in the last patch of the series, and is based on osdblk. Currently, it resides under the fs/ceph tree and does not exist as a separate module of its own, such that the ceph.ko module contains both the ceph file system and the rbd block device. Another option would be to have it as a separate module that resides under drivers/block, however, it would still depend on the ceph.ko module.

Any comments, questions, suggestions are more than welcome,

Yehuda

---
fs/ceph/Makefile | 3 +-
fs/ceph/caps.c | 2 +-
fs/ceph/debugfs.c | 11 +-
fs/ceph/file.c | 56 +++-
fs/ceph/mds_client.c | 11 +-
fs/ceph/messenger.c | 185 +++++++--
fs/ceph/messenger.h | 5 +-
fs/ceph/mon_client.c | 18 +-
fs/ceph/msgpool.c | 4 +-
fs/ceph/osd_client.c | 193 ++++++---
fs/ceph/osd_client.h | 27 ++
fs/ceph/rbd.c | 1224 ++++++++++++++++++++++++++++++++++++++++++++++++++
fs/ceph/rbd.h | 8 +
fs/ceph/super.c | 165 ++++++--
fs/ceph/super.h | 31 ++-
15 files changed, 1786 insertions(+), 157 deletions(-)


2010-04-13 23:52:25

by Yehuda Sadeh

[permalink] [raw]
Subject: [PATCH 4/6] ceph: enable creation of clients that don't need mds

Preparing grounds for rbd that doesn't need mds client.

Signed-off-by: Yehuda Sadeh <[email protected]>
---
fs/ceph/debugfs.c | 11 ++++++++---
fs/ceph/mon_client.c | 3 ++-
fs/ceph/super.c | 15 ++++++++++-----
fs/ceph/super.h | 2 ++
4 files changed, 22 insertions(+), 9 deletions(-)

diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index e159f14..0a60fa8 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -434,9 +434,14 @@ int ceph_debugfs_client_init(struct ceph_client *client)
if (!client->debugfs_congestion_kb)
goto out;

- sprintf(name, "../../bdi/%s", dev_name(client->sb->s_bdi->dev));
- client->debugfs_bdi = debugfs_create_symlink("bdi", client->debugfs_dir,
- name);
+ if (client->backing_dev_info.dev) {
+ sprintf(name, "../../bdi/%s",
+ dev_name(client->backing_dev_info.dev));
+ client->debugfs_bdi =
+ debugfs_create_symlink("bdi",
+ client->debugfs_dir,
+ name);
+ }

return 0;

diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 455c973..0419d42 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -776,7 +776,8 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
break;

case CEPH_MSG_MDS_MAP:
- ceph_mdsc_handle_map(&monc->client->mdsc, msg);
+ if (monc->client->have_mdsc)
+ ceph_mdsc_handle_map(&monc->client->mdsc, msg);
break;

case CEPH_MSG_OSD_MAP:
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 3ac0a90..4e7adf9 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -537,7 +537,8 @@ static void destroy_mount_args(struct ceph_mount_args *args)
/*
* create a fresh client instance
*/
-static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
+struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
+ int need_mdsc)
{
struct ceph_client *client;
int err = -ENOMEM;
@@ -592,9 +593,13 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
err = ceph_osdc_init(&client->osdc, client);
if (err < 0)
goto fail_monc;
- err = ceph_mdsc_init(&client->mdsc, client);
- if (err < 0)
- goto fail_osdc;
+ if (need_mdsc) {
+ err = ceph_mdsc_init(&client->mdsc, client);
+ if (err < 0)
+ goto fail_osdc;
+ client->have_mdsc = 1;
+ }
+
return client;

fail_osdc:
@@ -903,7 +908,7 @@ static int ceph_get_sb(struct file_system_type *fs_type,
}

/* create client (which we may/may not use) */
- client = ceph_create_client(args);
+ client = ceph_create_client(args, 1);
if (IS_ERR(client)) {
err = PTR_ERR(client);
goto out_final;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index b6c197d..78ee529 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -135,6 +135,8 @@ struct ceph_client {

int min_caps; /* min caps i added */

+ int have_mdsc;
+
struct ceph_messenger *msgr; /* messenger instance */
struct ceph_mon_client monc;
struct ceph_mds_client mdsc;
--
1.5.6.5

2010-04-13 23:52:29

by Yehuda Sadeh

[permalink] [raw]
Subject: [PATCH 5/6] ceph: refactor mount related functions, add helpers

Removed some functions' static declarations, separated mount
operation to __open_session and open_root_dentry for clients
that don't need the latter (rbd). Added other helper functions
that will be used later in the rbd.

Signed-off-by: Yehuda Sadeh <[email protected]>
---
fs/ceph/file.c | 46 ++++++++++++++++
fs/ceph/osd_client.h | 1 +
fs/ceph/super.c | 142 ++++++++++++++++++++++++++++++++++++++-----------
fs/ceph/super.h | 29 +++++++++--
4 files changed, 182 insertions(+), 36 deletions(-)

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index ef8f9e9..bf7d002 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -362,6 +362,52 @@ static int copy_user_to_page_vector(struct page **pages,
return len;
}

+int ceph_copy_to_page_vector(struct page **pages,
+ const char *data,
+ loff_t off, size_t len)
+{
+ int i = 0;
+ int po = off & ~PAGE_CACHE_MASK;
+ int left = len;
+ int l;
+
+ while (left > 0) {
+ l = min_t(int, PAGE_CACHE_SIZE-po, left);
+ memcpy(page_address(pages[i]) + po, data, l);
+ data += l;
+ left -= l;
+ po += l;
+ if (po == PAGE_CACHE_SIZE) {
+ po = 0;
+ i++;
+ }
+ }
+ return len;
+}
+
+int ceph_copy_from_page_vector(struct page **pages,
+ char *data,
+ loff_t off, size_t len)
+{
+ int i = 0;
+ int po = off & ~PAGE_CACHE_MASK;
+ int left = len;
+ int l;
+
+ while (left > 0) {
+ l = min_t(int, PAGE_CACHE_SIZE-po, left);
+ memcpy(data, page_address(pages[i]) + po, l);
+ data += l;
+ left -= l;
+ po += l;
+ if (po == PAGE_CACHE_SIZE) {
+ po = 0;
+ i++;
+ }
+ }
+ return len;
+}
+
/*
* copy user data from a page vector into a user pointer
*/
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 76aa63e..870b323 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -67,6 +67,7 @@ struct ceph_osd_request {

struct inode *r_inode; /* for use by callbacks */
struct writeback_control *r_wbc; /* ditto */
+ void *r_priv; /* ditto */

char r_oid[40]; /* object name */
int r_oid_len;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 4e7adf9..52e9e86 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -344,14 +344,15 @@ static match_table_t arg_tokens = {
};


-static struct ceph_mount_args *parse_mount_args(int flags, char *options,
- const char *dev_name,
- const char **path)
+struct ceph_mount_args *parse_mount_args(int flags, char *options,
+ const char *dev_name,
+ const char **path)
{
struct ceph_mount_args *args;
const char *c;
int err = -ENOMEM;
substring_t argstr[MAX_OPT_ARGS];
+ const char *end_path;

args = kzalloc(sizeof(*args), GFP_KERNEL);
if (!args)
@@ -382,23 +383,29 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
err = -EINVAL;
if (!dev_name)
goto out;
- *path = strstr(dev_name, ":/");
- if (*path == NULL) {
- pr_err("device name is missing path (no :/ in %s)\n",
- dev_name);
- goto out;
+
+ if (path) {
+ *path = strstr(dev_name, ":/");
+ if (*path == NULL) {
+ pr_err("device name is missing path (no :/ in %s)\n",
+ dev_name);
+ goto out;
+ }
+ end_path = *path;
+
+ /* path on server */
+ *path += 2;
+ dout("server path '%s'\n", *path);
+ } else {
+ end_path = dev_name + strlen(dev_name);
}

/* get mon ip(s) */
- err = ceph_parse_ips(dev_name, *path, args->mon_addr,
+ err = ceph_parse_ips(dev_name, end_path, args->mon_addr,
CEPH_MAX_MON, &args->num_mon);
if (err < 0)
goto out;

- /* path on server */
- *path += 2;
- dout("server path '%s'\n", *path);
-
/* parse mount options */
while ((c = strsep(&options, ",")) != NULL) {
int token, intval, ret;
@@ -534,6 +541,51 @@ static void destroy_mount_args(struct ceph_mount_args *args)
kfree(args);
}

+static int strcmp_null(const char *s1, const char *s2)
+{
+ if (!s1 && !s2)
+ return 0;
+ if (s1 && !s2)
+ return -1;
+ if (!s1 && s2)
+ return 1;
+ return strcmp(s1, s2);
+}
+
+int ceph_compare_mount_args(struct ceph_mount_args *new_args,
+ struct ceph_client *client)
+{
+ struct ceph_mount_args *args1 = new_args;
+ struct ceph_mount_args *args2 = client->mount_args;
+ int ofs = offsetof(struct ceph_mount_args, mon_addr);
+ int i;
+ int ret;
+
+ ret = memcmp(args1, args2, ofs);
+ if (ret)
+ return ret;
+
+ ret = strcmp_null(args1->snapdir_name, args2->snapdir_name);
+ if (ret)
+ return ret;
+
+ ret = strcmp_null(args1->name, args2->name);
+ if (ret)
+ return ret;
+
+ ret = strcmp_null(args1->secret, args2->secret);
+ if (ret)
+ return ret;
+
+ for (i = 0; i < args1->num_mon; i++) {
+ if (ceph_monmap_contains(client->monc.monmap,
+ &args1->mon_addr[i]))
+ return 0;
+ }
+
+ return -1;
+}
+
/*
* create a fresh client instance
*/
@@ -621,7 +673,7 @@ fail:
return ERR_PTR(err);
}

-static void ceph_destroy_client(struct ceph_client *client)
+void ceph_destroy_client(struct ceph_client *client)
{
dout("destroy_client %p\n", client);

@@ -722,17 +774,12 @@ static struct dentry *open_root_dentry(struct ceph_client *client,
/*
* mount: join the ceph cluster, and open root directory.
*/
-static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
- const char *path)
+static int __ceph_open_session(struct ceph_client *client,
+ unsigned long started)
{
struct ceph_entity_addr *myaddr = NULL;
int err;
unsigned long timeout = client->mount_args->mount_timeout * HZ;
- unsigned long started = jiffies; /* note the start time */
- struct dentry *root;
-
- dout("mount start\n");
- mutex_lock(&client->mount_mutex);

/* initialize the messenger */
if (client->msgr == NULL) {
@@ -740,9 +787,8 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
myaddr = &client->mount_args->my_addr;
client->msgr = ceph_messenger_create(myaddr);
if (IS_ERR(client->msgr)) {
- err = PTR_ERR(client->msgr);
client->msgr = NULL;
- goto out;
+ return PTR_ERR(client->msgr);
}
client->msgr->nocrc = ceph_test_opt(client, NOCRC);
}
@@ -750,26 +796,58 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
/* open session, and wait for mon, mds, and osd maps */
err = ceph_monc_open_session(&client->monc);
if (err < 0)
- goto out;
+ return err;

while (!have_mon_and_osd_map(client)) {
err = -EIO;
if (timeout && time_after_eq(jiffies, started + timeout))
- goto out;
+ return err;

/* wait */
dout("mount waiting for mon_map\n");
err = wait_event_interruptible_timeout(client->auth_wq,
- have_mon_and_osd_map(client) || (client->auth_err < 0),
- timeout);
+ have_mon_and_osd_map(client) || (client->auth_err < 0),
+ timeout);
if (err == -EINTR || err == -ERESTARTSYS)
- goto out;
- if (client->auth_err < 0) {
- err = client->auth_err;
- goto out;
- }
+ return err;
+ if (client->auth_err < 0)
+ return client->auth_err;
}

+ return 0;
+}
+
+int ceph_open_session(struct ceph_client *client)
+{
+ int ret;
+ unsigned long started = jiffies; /* note the start time */
+
+ dout("open_session start\n");
+ mutex_lock(&client->mount_mutex);
+
+ ret = __ceph_open_session(client, started);
+
+ mutex_unlock(&client->mount_mutex);
+ return ret;
+}
+
+/*
+ * mount: join the ceph cluster, and open root directory.
+ */
+static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
+ const char *path)
+{
+ int err;
+ unsigned long started = jiffies; /* note the start time */
+ struct dentry *root;
+
+ dout("mount start\n");
+ mutex_lock(&client->mount_mutex);
+
+ err = __ceph_open_session(client, started);
+ if (err < 0)
+ goto out;
+
dout("mount opening root\n");
root = open_root_dentry(client, "", started);
if (IS_ERR(root)) {
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 78ee529..9761768 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -47,11 +47,8 @@
#define ceph_test_opt(client, opt) \
(!!((client)->mount_args->flags & CEPH_OPT_##opt))

-
struct ceph_mount_args {
int sb_flags;
- int num_mon;
- struct ceph_entity_addr *mon_addr;
int flags;
int mount_timeout;
int osd_idle_ttl;
@@ -64,10 +61,18 @@ struct ceph_mount_args {
int congestion_kb; /* max readdir size */
int osd_timeout;
int osd_keepalive_timeout;
+ int cap_release_safety;
+
+ /* any type that can't be simply compared or doesn't need
+ need to be compared should go beyond this point,
+ ceph_compare_mount_args() should be updated accordingly */
+ struct ceph_entity_addr *mon_addr; /* should be the first
+ pointer type of args */
+ int num_mon;
+
char *snapdir_name; /* default ".snap" */
char *name;
char *secret;
- int cap_release_safety;
};

/*
@@ -736,6 +741,15 @@ extern struct kmem_cache *ceph_file_cachep;

extern const char *ceph_msg_type_name(int type);
extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
+extern struct ceph_mount_args *parse_mount_args(int flags, char *options,
+ const char *dev_name,
+ const char **path);
+extern int ceph_compare_mount_args(struct ceph_mount_args *new_args,
+ struct ceph_client *client);
+extern struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
+ int need_mdsc);
+extern void ceph_destroy_client(struct ceph_client *client);
+extern int ceph_open_session(struct ceph_client *client);

#define FSID_FORMAT "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-" \
"%02x%02x%02x%02x%02x%02x"
@@ -846,6 +860,13 @@ extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
/* file.c */
extern const struct file_operations ceph_file_fops;
extern const struct address_space_operations ceph_aops;
+extern int ceph_copy_to_page_vector(struct page **pages,
+ const char *data,
+ loff_t off, size_t len);
+extern int ceph_copy_from_page_vector(struct page **pages,
+ char *data,
+ loff_t off, size_t len);
+extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags);
extern int ceph_open(struct inode *inode, struct file *file);
extern struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry,
struct nameidata *nd, int mode,
--
1.5.6.5

2010-04-13 23:52:23

by Yehuda Sadeh

[permalink] [raw]
Subject: [PATCH 6/6] ceph: rados block device

This adds the rbd under fs/ceph. The rados block device is
based on the drivers/block/osdblk.c. It allows mapping of
rados (ther ceph block layer) objects over a block device.
Each device has a metadata object, and data is being striped
across different objects.

Signed-off-by: Yehuda Sadeh <[email protected]>
---
fs/ceph/Makefile | 3 +-
fs/ceph/rbd.c | 1224 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
fs/ceph/rbd.h | 8 +
fs/ceph/super.c | 8 +
4 files changed, 1242 insertions(+), 1 deletions(-)

diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index 6a660e6..f06ff40 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -16,7 +16,8 @@ ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
auth.o auth_none.o \
crypto.o armor.o \
auth_x.o \
- ceph_fs.o ceph_strings.o ceph_hash.o ceph_frag.o
+ ceph_fs.o ceph_strings.o ceph_hash.o ceph_frag.o \
+ rbd.o

else
#Otherwise we were called directly from the command
diff --git a/fs/ceph/rbd.c b/fs/ceph/rbd.c
new file mode 100644
index 0000000..3f7a7bd
--- /dev/null
+++ b/fs/ceph/rbd.c
@@ -0,0 +1,1224 @@
+/*
+ rbd.c -- Export ceph rados objects as a Linux block device
+
+
+ based on drivers/block/osdblk.c:
+
+ Copyright 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to
+ the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+
+
+
+ Instructions for use
+ --------------------
+
+ 1) Map a Linux block device to an existing rbd image.
+
+ Usage: <mon ip addr> <options> <pool id> <rbd image name>
+
+ $ echo "192.168.0.1 name=admin 0 foo" > /sys/class/rbd/add
+
+
+ 2) List all active blkdev<->object mappings.
+
+ In this example, we have performed step #1 twice, creating two blkdevs,
+ mapped to two separate rados objects in rados pool 0.
+
+ $ cat /sys/class/rbd/list
+ 0 254 0 foo
+ 1 253 0 bar
+
+ The columns, in order, are:
+ - blkdev unique id
+ - blkdev assigned major
+ - rados pool id
+ - rados block device name
+
+
+ 3) Remove an active blkdev<->rbd image mapping.
+
+ In this example, we remove the mapping with blkdev unique id 1.
+
+ $ echo 1 > /sys/class/rbd/remove
+
+
+ NOTE: The actual creation and deletion of rados objects is outside the scope
+ of this driver.
+
+ */
+
+#include "super.h"
+#include "osd_client.h"
+
+#include <linux/kernel.h>
+#include <linux/device.h>
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/blkdev.h>
+
+#define DRV_NAME "rbd"
+#define DRV_NAME_LONG "rbd (rados block device)"
+
+
+enum {
+ RBD_MINORS_PER_MAJOR = 256, /* max minors per blkdev */
+};
+
+static const char rbd_text[] = "<<< Rados Block Device Image >>>\n";
+static const char rbd_signature[] = "RBD";
+static const char rbd_version[] = "001.000";
+
+#define RBD_COMP_NONE 0
+#define RBD_CRYPT_NONE 0
+
+#define RBD_DEFAULT_OBJ_ORDER 22 /* 4MB */
+
+#define RBD_SUFFIX ".rbd"
+
+#define RBD_MAX_OBJ_NAME_SIZE 96
+#define RBD_MAX_SEG_NAME_SIZE 128
+
+#define RBD_STRIPE_UNIT (1 << 22)
+
+#define RBD_MAX_OPT_LEN 1024
+
+
+struct rbd_obj_header_ondisk {
+ char text[64];
+ char signature[4];
+ char version[8];
+ __le64 image_size;
+ __u8 obj_order;
+ __u8 crypt_type;
+ __u8 comp_type;
+ __le64 snap_seq;
+ __le16 snap_count;
+ __le64 snap_id[0];
+} __attribute__((packed));
+
+struct rbd_obj_header {
+ u64 image_size;
+ __u8 obj_order;
+ __u8 crypt_type;
+ __u8 comp_type;
+ u64 snap_seq;
+ u16 snap_count;
+ u64 snap_id[0];
+};
+
+struct rbd_request {
+ struct request *rq; /* blk layer request */
+ struct bio *bio; /* cloned bio */
+ struct page **pages; /* list of used pages */
+ u64 len;
+};
+
+struct rbd_client_node {
+ struct ceph_client *client;
+ const char *opt;
+ struct kref kref;
+ struct list_head node;
+};
+
+#define DEV_NAME_LEN 32
+
+struct rbd_device {
+ int id; /* blkdev unique id */
+
+ int major; /* blkdev assigned major */
+ struct gendisk *disk; /* blkdev's gendisk and rq */
+ struct request_queue *q;
+
+ struct ceph_client *client; /* associated OSD */
+
+ char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd34 */
+
+ spinlock_t lock; /* queue lock */
+
+ struct rbd_obj_header *header;
+ char obj[RBD_MAX_OBJ_NAME_SIZE]; /* rbd image name */
+ int obj_len;
+ int poolid;
+
+ struct list_head node;
+ struct rbd_client_node *client_node;
+};
+
+static spinlock_t node_lock; /* protects client get/put */
+
+static struct class *class_rbd; /* /sys/class/rbd */
+static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
+static LIST_HEAD(rbddev_list);
+static LIST_HEAD(node_list);
+
+static const struct block_device_operations rbd_bd_ops = {
+ .owner = THIS_MODULE,
+};
+
+/*
+ * Initialize ceph client for a specific device.
+ */
+static int rbd_init_client(struct rbd_device *rbd_dev,
+ struct ceph_mount_args *args)
+{
+ dout("rbd_init_device\n");
+ rbd_dev->client = ceph_create_client(args, 0);
+ if (IS_ERR(rbd_dev->client))
+ return PTR_ERR(rbd_dev->client);
+
+ return ceph_open_session(rbd_dev->client);
+}
+
+/*
+ * Find a ceph client with specific addr and configuration.
+ */
+static struct rbd_client_node *__get_client_node(struct ceph_mount_args *args)
+{
+ struct rbd_client_node *client_node;
+
+ if (args->flags & CEPH_OPT_NOSHARE)
+ return NULL;
+
+ list_for_each_entry(client_node, &node_list, node)
+ if (ceph_compare_mount_args(args, client_node->client) == 0)
+ return client_node;
+ return NULL;
+}
+
+/*
+ * Get a ceph client with specific addr and configuration, if one does
+ * not exist create it.
+ */
+static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
+ char *opt)
+{
+ struct rbd_client_node *client_node;
+ struct ceph_mount_args *args;
+ int ret;
+
+ spin_lock(&node_lock);
+
+ args = parse_mount_args(0, opt, mon_addr, NULL);
+ if (IS_ERR(args))
+ return PTR_ERR(args);
+
+ client_node = __get_client_node(args);
+ if (client_node) {
+ kfree(args);
+
+ kref_get(&client_node->kref);
+ rbd_dev->client_node = client_node;
+ rbd_dev->client = client_node->client;
+ spin_unlock(&node_lock);
+ return 0;
+ }
+
+ spin_unlock(&node_lock);
+
+ ret = -ENOMEM;
+ client_node = kmalloc(sizeof(struct rbd_client_node), GFP_KERNEL);
+ if (!client_node)
+ goto out;
+
+ ret = rbd_init_client(rbd_dev, args);
+ if (ret < 0)
+ goto out_free;
+
+ client_node->client = rbd_dev->client;
+ client_node->opt = kstrdup(opt, GFP_KERNEL);
+ kref_init(&client_node->kref);
+ INIT_LIST_HEAD(&client_node->node);
+
+ rbd_dev->client_node = client_node;
+
+ spin_lock(&node_lock);
+ list_add_tail(&client_node->node, &node_list);
+ spin_unlock(&node_lock);
+
+ return 0;
+
+out_free:
+ kfree(client_node);
+out:
+ return ret;
+}
+
+/*
+ * Destroy ceph client
+ */
+static void rbd_release_client(struct kref *kref)
+{
+ struct rbd_client_node *node =
+ container_of(kref, struct rbd_client_node, kref);
+
+ dout("rbd_release_client\n");
+
+ spin_lock(&node_lock);
+ list_del(&node->node);
+ spin_unlock(&node_lock);
+
+ ceph_destroy_client(node->client);
+ kfree(node->opt);
+ kfree(node);
+}
+
+/*
+ * Drop reference to ceph client node. If it's not referenced anymore, release
+ * it.
+ */
+static void rbd_put_client(struct rbd_device *rbd_dev)
+{
+ if (!rbd_dev->client_node)
+ return;
+
+ kref_put(&rbd_dev->client_node->kref, rbd_release_client);
+ rbd_dev->client_node = NULL;
+}
+
+
+/*
+ * Create a new header structure, translate header format from the on-disk
+ * header.
+ */
+static int rbd_header_from_disk(struct rbd_obj_header **header,
+ struct rbd_obj_header_ondisk *ondisk,
+ gfp_t gfp_flags)
+{
+ int i;
+ u16 snap_count = le16_to_cpu(ondisk->snap_count);
+
+ *header = kmalloc(sizeof(struct rbd_obj_header) +
+ snap_count * sizeof(u32), gfp_flags);
+ if (!*header)
+ return -ENOMEM;
+
+ (*header)->image_size = le64_to_cpu(ondisk->image_size);
+ (*header)->obj_order = ondisk->obj_order;
+ (*header)->crypt_type = ondisk->crypt_type;
+ (*header)->comp_type = ondisk->comp_type;
+ (*header)->snap_seq = le64_to_cpu(ondisk->snap_seq);
+ (*header)->snap_count = snap_count;
+
+ for (i = 0; i < snap_count; i++)
+ (*header)->snap_id[i] = le64_to_cpu(ondisk->snap_id[i]);
+
+ return 0;
+}
+
+/*
+ * get the actual striped segment name, offset and length
+ */
+static u64 rbd_get_segment(struct rbd_obj_header *header,
+ const char *obj_name,
+ u64 ofs, u64 len,
+ char *seg_name, u64 *segofs)
+{
+ u64 seg = ofs >> header->obj_order;
+
+ if (seg_name)
+ snprintf(seg_name, RBD_MAX_SEG_NAME_SIZE,
+ "%s.%012llx", obj_name, seg);
+
+ ofs = ofs & ((1 << header->obj_order) - 1);
+ len = min_t(u64, len, (1 << header->obj_order) - ofs);
+
+ if (segofs)
+ *segofs = ofs;
+
+ return len;
+}
+
+static void bio_chain_put(struct bio *chain)
+{
+ struct bio *tmp;
+
+ while (chain) {
+ tmp = chain;
+ chain = chain->bi_next;
+
+ bio_put(tmp);
+ }
+}
+
+/*
+ * zeros a bio chain, starting at specific offset
+ */
+static void zero_bio_chain(struct bio *chain, int start_ofs)
+{
+ struct bio_vec *bv;
+ unsigned long flags;
+ void *buf;
+ int i;
+ int pos = 0;
+
+ while (chain) {
+ bio_for_each_segment(bv, chain, i) {
+ if (pos + bv->bv_len > start_ofs) {
+ int remainder = max(start_ofs - pos, 0);
+ buf = bvec_kmap_irq(bv, &flags);
+ memset(buf + remainder, 0,
+ bv->bv_len - remainder);
+ bvec_kunmap_irq(bv, &flags);
+ }
+ pos += bv->bv_len;
+ }
+
+ chain = chain->bi_next;
+ }
+}
+
+/*
+ * bio_chain_clone - clone a chain of bios up to a certain length.
+ * might return a bio_pair that will need to be released.
+ */
+static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
+ struct bio_pair **bp,
+ int len, gfp_t gfpmask)
+{
+ struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
+ int total = 0;
+
+ if (*bp) {
+ bio_pair_release(*bp);
+ *bp = NULL;
+ }
+
+ while (old_chain && (total < len)) {
+ tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
+ if (!tmp)
+ goto err_out;
+
+ if (total + old_chain->bi_size > len) {
+ struct bio_pair *bp;
+
+ /*
+ * this split can only happen with a single paged bio,
+ * split_bio will BUG_ON if this is not the case
+ */
+ dout("bio_chain_clone split! total=%d remaining=%d bi_size=%d\n",
+ (int)total, (int)len-total, (int)old_chain->bi_size);
+
+ /* split the bio. We'll release it either in the next
+ call, or it will have to be released outside */
+ bp = bio_split(old_chain, (len - total) / 512ULL);
+ if (!bp)
+ goto err_out;
+
+ __bio_clone(tmp, &bp->bio1);
+
+ *next = &bp->bio2;
+ } else {
+ __bio_clone(tmp, old_chain);
+ *next = old_chain->bi_next;
+ }
+
+ tmp->bi_bdev = NULL;
+ gfpmask &= ~__GFP_WAIT;
+ tmp->bi_next = NULL;
+
+ if (!new_chain) {
+ new_chain = tail = tmp;
+ } else {
+ tail->bi_next = tmp;
+ tail = tmp;
+ }
+ old_chain = old_chain->bi_next;
+
+ total += tmp->bi_size;
+ }
+
+ BUG_ON(total < len);
+
+ if (tail)
+ tail->bi_next = NULL;
+
+ *old = old_chain;
+
+ return new_chain;
+
+err_out:
+ dout("bio_chain_clone with err\n");
+ bio_chain_put(new_chain);
+ return NULL;
+}
+
+static void rbd_req_flush_object(struct request *rq,
+ const char *obj,
+ u64 offset, u64 len)
+{
+ /* not really flushing anything currently */
+ blk_end_request_all(rq, 0);
+}
+
+/*
+ * Send ceph osd request
+ */
+static int rbd_do_request(struct request *rq,
+ struct rbd_device *dev,
+ const char *obj, u64 ofs, u64 len,
+ struct bio *bio,
+ struct page **pages,
+ int num_pages,
+ int opcode, int flags,
+ int num_reply,
+ void (*rbd_cb)(struct ceph_osd_request *req,
+ struct ceph_msg *msg))
+{
+ struct ceph_osd_request *req;
+ struct ceph_file_layout *layout;
+ int ret;
+ u64 bno;
+ struct timespec mtime = CURRENT_TIME;
+ struct rbd_request *req_data;
+ struct ceph_osd_request_head *reqhead;
+
+ ret = -ENOMEM;
+ req_data = kzalloc(sizeof(*req_data), GFP_NOFS);
+ if (!req_data)
+ goto done;
+
+ dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
+
+ req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
+ NULL, 0,
+ false,
+ GFP_NOFS, pages, bio);
+ if (IS_ERR(req)) {
+ ret = PTR_ERR(req);
+ goto done_pages;
+ }
+
+ req->r_callback = rbd_cb;
+
+ req_data->rq = rq;
+ req_data->bio = bio;
+ req_data->pages = pages;
+ req_data->len = len;
+
+ req->r_priv = req_data;
+
+ reqhead = req->r_request->front.iov_base;
+ reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
+
+ strncpy(req->r_oid, obj, sizeof(req->r_oid));
+ req->r_oid_len = strlen(req->r_oid);
+
+ layout = &req->r_file_layout;
+ memset(layout, 0, sizeof(*layout));
+ layout->fl_stripe_unit = RBD_STRIPE_UNIT;
+ layout->fl_stripe_count = 1;
+ layout->fl_object_size = RBD_STRIPE_UNIT;
+ layout->fl_pg_preferred = -1;
+ layout->fl_pg_pool = dev->poolid;
+ ceph_calc_raw_layout(&dev->client->osdc, layout, ofs, len, &bno, req);
+
+ ceph_osdc_build_request(req, ofs, &len, opcode,
+ NULL, 0,
+ 0, 0,
+ &mtime,
+ req->r_oid, req->r_oid_len);
+
+ ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
+ if (ret < 0)
+ goto done_err;
+
+ if (!rbd_cb) {
+ ret = ceph_osdc_wait_request(&dev->client->osdc, req);
+ ceph_osdc_put_request(req);
+ }
+ return ret;
+
+done_err:
+ bio_chain_put(req_data->bio);
+ ceph_osdc_put_request(req);
+done_pages:
+ kfree(req_data);
+done:
+ if (rq)
+ blk_end_request(rq, ret, len);
+ return ret;
+}
+
+/*
+ * Ceph osd op callback
+ */
+static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+{
+ struct rbd_request *req_data = req->r_priv;
+ struct ceph_osd_reply_head *replyhead;
+ struct ceph_osd_op *op;
+ __s32 rc;
+ u64 bytes;
+ int read_op;
+
+ /* parse reply */
+ replyhead = msg->front.iov_base;
+ WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
+ op = (void *)(replyhead + 1);
+ rc = le32_to_cpu(replyhead->result);
+ bytes = le64_to_cpu(op->extent.length);
+
+ dout("rbd_req_cb bytes=%lld rc=%d\n", bytes, rc);
+
+ read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
+
+ if (rc == -ENOENT && read_op) {
+ zero_bio_chain(req_data->bio, 0);
+ rc = 0;
+ } else if (rc == 0 && read_op && bytes < req_data->len) {
+ zero_bio_chain(req_data->bio, bytes);
+ bytes = req_data->len;
+ }
+
+ blk_end_request(req_data->rq, rc, bytes);
+
+ if (req_data->bio)
+ bio_chain_put(req_data->bio);
+
+ ceph_osdc_put_request(req);
+ kfree(req_data);
+}
+
+/*
+ * Do a synchronous ceph osd operation
+ */
+static int rbd_req_sync_op(struct rbd_device *dev,
+ int opcode, int flags,
+ int num_reply,
+ const char *obj,
+ u64 ofs, u64 len,
+ char *buf)
+{
+ int ret;
+ struct page **pages;
+ int num_pages;
+
+ num_pages = calc_pages_for(ofs , len);
+ pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+ if (!pages)
+ return -ENOMEM;
+
+ if (flags & CEPH_OSD_FLAG_WRITE) {
+ ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
+ if (ret < 0)
+ goto done;
+ }
+
+ ret = rbd_do_request(NULL, dev, obj, ofs, len, NULL,
+ pages, num_pages,
+ opcode,
+ flags,
+ 2,
+ NULL);
+ if (ret < 0)
+ goto done;
+
+ if (flags & CEPH_OSD_FLAG_READ)
+ ret = ceph_copy_from_page_vector(pages, buf, ofs, len);
+
+done:
+ ceph_release_page_vector(pages, num_pages);
+ return ret;
+}
+
+/*
+ * Do an asynchronous ceph osd operation
+ */
+static int rbd_do_op(struct request *rq,
+ struct rbd_device *rbd_dev ,
+ int opcode, int flags, int num_reply,
+ u64 ofs, u64 len,
+ struct bio *bio)
+{
+ char *seg_name;
+ u64 seg_ofs;
+ u64 seg_len;
+ int ret;
+
+ seg_name = kmalloc(RBD_MAX_SEG_NAME_SIZE + 1, GFP_NOIO);
+ if (!seg_name)
+ return -ENOMEM;
+
+ seg_len = rbd_get_segment(rbd_dev->header,
+ rbd_dev->obj,
+ ofs, len,
+ seg_name, &seg_ofs);
+ if (seg_len < 0)
+ return seg_len;
+
+ /* we've taken care of segment sizes earlier when we
+ cloned the bios. We should never have a segment
+ truncated at this point */
+ BUG_ON(seg_len < len);
+
+ ret = rbd_do_request(rq, rbd_dev,
+ seg_name, seg_ofs, seg_len,
+ bio,
+ NULL, 0,
+ opcode,
+ flags,
+ num_reply,
+ rbd_req_cb);
+ kfree(seg_name);
+ return ret;
+}
+
+/*
+ * Request async osd write
+ */
+static int rbd_req_write(struct request *rq,
+ struct rbd_device *rbd_dev,
+ u64 ofs, u64 len,
+ struct bio *bio)
+{
+ return rbd_do_op(rq, rbd_dev,
+ CEPH_OSD_OP_WRITE,
+ CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+ 2,
+ ofs, len, bio);
+}
+
+/*
+ * Request sync osd write
+ */
+static int __attribute__ ((unused))
+rbd_req_sync_write(struct rbd_device *dev,
+ const char *obj,
+ u64 ofs, u64 len,
+ char *buf)
+{
+ return rbd_req_sync_op(dev,
+ CEPH_OSD_OP_WRITE,
+ CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+ 2, obj, ofs, len, buf);
+}
+
+/*
+ * Request async osd read
+ */
+static int rbd_req_read(struct request *rq,
+ struct rbd_device *rbd_dev,
+ u64 ofs, u64 len,
+ struct bio *bio)
+{
+ return rbd_do_op(rq, rbd_dev,
+ CEPH_OSD_OP_READ,
+ CEPH_OSD_FLAG_READ,
+ 2,
+ ofs, len, bio);
+}
+
+/*
+ * Request sync osd read
+ */
+static int rbd_req_sync_read(struct rbd_device *dev,
+ const char *obj,
+ u64 ofs, u64 len,
+ char *buf)
+{
+ return rbd_req_sync_op(dev,
+ CEPH_OSD_OP_READ,
+ CEPH_OSD_FLAG_READ,
+ 1, obj, ofs, len, buf);
+}
+
+/*
+ * block device queue callback
+ */
+static void rbd_rq_fn(struct request_queue *q)
+{
+ struct rbd_device *rbd_dev = q->queuedata;
+ struct request *rq;
+ struct bio_pair *bp = NULL;
+
+ rq = blk_fetch_request(q);
+
+ while (1) {
+ struct bio *bio;
+ struct bio *rq_bio, *next_bio = NULL;
+ bool do_write, do_flush;
+ int size, op_size = 0;
+ u64 ofs;
+
+ /* peek at request from block layer */
+ if (!rq)
+ break;
+
+ dout("fetched request\n");
+
+ /* filter out block requests we don't understand */
+ if (!blk_fs_request(rq) && !blk_barrier_rq(rq)) {
+ blk_end_request_all(rq, 0);
+ continue;
+ }
+
+ /* deduce our operation (read, write, flush) */
+ /* I wish the block layer simplified cmd_type/cmd_flags/cmd[]
+ * into a clearly defined set of RPC commands:
+ * read, write, flush, scsi command, power mgmt req,
+ * driver-specific, etc.
+ */
+ do_flush = (rq->special == (void *) 0xdeadbeefUL);
+ do_write = (rq_data_dir(rq) == WRITE);
+
+ size = blk_rq_bytes(rq);
+ ofs = blk_rq_pos(rq) * 512ULL;
+ rq_bio = rq->bio;
+
+ spin_unlock_irq(q->queue_lock);
+
+ dout("%s 0x%x bytes at 0x%llx\n",
+ do_flush ? "flush" : (do_write ? "write" : "read"),
+ size, blk_rq_pos(rq) * 512ULL);
+
+ do {
+ if (!do_flush) { /* osd_flush does not use a bio */
+ /* a bio clone to be passed down to OSD req */
+ dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
+ op_size = rbd_get_segment(rbd_dev->header,
+ rbd_dev->obj,
+ ofs, size,
+ NULL, NULL);
+ bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
+ op_size, GFP_ATOMIC);
+ if (!bio)
+ break;
+ } else {
+ bio = NULL;
+ }
+
+ /* init OSD command: flush, write or read */
+ if (do_flush)
+ rbd_req_flush_object(rq, rbd_dev->obj,
+ 0, 0);
+ else if (do_write)
+ rbd_req_write(rq, rbd_dev,
+ ofs,
+ op_size, bio);
+ else
+ rbd_req_read(rq, rbd_dev,
+ ofs,
+ op_size, bio);
+
+ size -= op_size;
+ ofs += op_size;
+
+ rq_bio = next_bio;
+ } while (size > 0);
+
+ if (bp)
+ bio_pair_release(bp);
+
+ spin_lock_irq(q->queue_lock);
+
+ /* remove the special 'flush' marker, now that the command
+ * is executing */
+ rq->special = NULL;
+
+ rq = blk_fetch_request(q);
+ }
+}
+
+/*
+ * a queue callback. Makes sure that we don't create a bio that spans across
+ * multiple osd objects. One exception would be with a single page bios,
+ * which we handle later at bio_chain_clone
+ */
+static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
+ struct bio_vec *bvec)
+{
+ sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
+ unsigned int chunk_sectors = (RBD_STRIPE_UNIT >> 9);
+ unsigned int bio_sectors = bmd->bi_size >> 9;
+ int max;
+
+ max = (chunk_sectors - ((sector & (chunk_sectors - 1))
+ + bio_sectors)) << 9;
+ if (max < 0)
+ max = 0; /* bio_add cannot handle a negative return */
+ if (max <= bvec->bv_len && bio_sectors == 0)
+ return bvec->bv_len;
+ return max;
+}
+
+static void rbd_prepare_flush(struct request_queue *q, struct request *rq)
+{
+ /*
+ * add driver-specific marker, to indicate that this request
+ * is a flush command
+ */
+ rq->special = (void *) 0xdeadbeefUL;
+}
+
+static void rbd_free_disk(struct rbd_device *rbd_dev)
+{
+ struct gendisk *disk = rbd_dev->disk;
+
+ if (!disk)
+ return;
+
+ if (disk->flags & GENHD_FL_UP)
+ del_gendisk(disk);
+ if (disk->queue)
+ blk_cleanup_queue(disk->queue);
+ put_disk(disk);
+}
+
+static int rbd_read_header(struct rbd_device *rbd_dev,
+ struct rbd_obj_header **header)
+{
+ ssize_t rc;
+ char *obj_md_name;
+ struct rbd_obj_header_ondisk *dh;
+ int snap_count = 0;
+
+ obj_md_name = kmalloc(strlen(rbd_dev->obj) + sizeof(RBD_SUFFIX),
+ GFP_KERNEL);
+ if (!obj_md_name)
+ return -ENOMEM;
+ sprintf(obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
+
+ while (1) {
+ dh = kmalloc(sizeof(*dh) + snap_count * sizeof(u32),
+ GFP_KERNEL);
+ if (!dh)
+ goto out_obj_md;
+
+ rc = rbd_req_sync_read(rbd_dev,
+ obj_md_name,
+ 0, sizeof(*dh),
+ (char *)dh);
+ if (rc < 0)
+ goto out_dh;
+
+ rc = rbd_header_from_disk(header, dh, GFP_KERNEL);
+ if (rc < 0)
+ goto out_dh;
+
+ if (snap_count != (*header)->snap_count) {
+ snap_count = (*header)->snap_count;
+ kfree(*header);
+ kfree(dh);
+ continue;
+ }
+ break;
+ }
+
+out_dh:
+ kfree(dh);
+out_obj_md:
+ kfree(obj_md_name);
+ return rc;
+}
+
+static int rbd_init_disk(struct rbd_device *rbd_dev)
+{
+ struct gendisk *disk;
+ struct request_queue *q;
+ int rc;
+ u64 total_size;
+
+ /* contact OSD, request size info about the object being mapped */
+ rc = rbd_read_header(rbd_dev, &rbd_dev->header);
+ if (rc)
+ return rc;
+
+ total_size = rbd_dev->header->image_size;
+
+ /* create gendisk info */
+ rc = -ENOMEM;
+ disk = alloc_disk(RBD_MINORS_PER_MAJOR);
+ if (!disk)
+ goto out;
+
+ sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
+ disk->major = rbd_dev->major;
+ disk->first_minor = 0;
+ disk->fops = &rbd_bd_ops;
+ disk->private_data = rbd_dev;
+
+ /* init rq */
+ rc = -ENOMEM;
+ q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
+ if (!q)
+ goto out_disk;
+ blk_queue_ordered(q, QUEUE_ORDERED_DRAIN_FLUSH, rbd_prepare_flush);
+ blk_queue_merge_bvec(q, rbd_merge_bvec);
+ disk->queue = q;
+
+ q->queuedata = rbd_dev;
+
+ rbd_dev->disk = disk;
+ rbd_dev->q = q;
+
+ /* finally, announce the disk to the world */
+ set_capacity(disk, total_size / 512ULL);
+ add_disk(disk);
+
+ pr_info("%s: added with size 0x%llx\n",
+ disk->disk_name, (unsigned long long)total_size);
+ return 0;
+
+out_disk:
+ put_disk(disk);
+out:
+ return rc;
+}
+
+/********************************************************************
+ * /sys/class/rbd/
+ * add map rados objects to blkdev
+ * remove unmap rados objects
+ * list show mappings
+ *******************************************************************/
+
+static void class_rbd_release(struct class *cls)
+{
+ kfree(cls);
+}
+
+static ssize_t class_rbd_list(struct class *c,
+ struct class_attribute *attr,
+ char *data)
+{
+ int n = 0;
+ struct list_head *tmp;
+
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+ list_for_each(tmp, &rbddev_list) {
+ struct rbd_device *rbd_dev;
+
+ rbd_dev = list_entry(tmp, struct rbd_device, node);
+ n += sprintf(data+n, "%d %d %d %s\n",
+ rbd_dev->id,
+ rbd_dev->major,
+ rbd_dev->poolid,
+ rbd_dev->obj);
+ }
+
+ mutex_unlock(&ctl_mutex);
+ return n;
+}
+
+static ssize_t class_rbd_add(struct class *c,
+ struct class_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct rbd_device *rbd_dev;
+ ssize_t rc = -ENOMEM;
+ int irc, new_id = 0;
+ struct list_head *tmp;
+ char *mon_dev_name;
+ char *opt;
+
+ if (!try_module_get(THIS_MODULE))
+ return -ENODEV;
+
+ mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
+ if (!mon_dev_name)
+ goto err_out_mod;
+
+ opt = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
+ if (!opt)
+ goto err_mon_dev;
+
+ /* new rbd_device object */
+ rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
+ if (!rbd_dev)
+ goto err_out_opt;
+
+ /* static rbd_device initialization */
+ spin_lock_init(&rbd_dev->lock);
+ INIT_LIST_HEAD(&rbd_dev->node);
+
+ /* generate unique id: find highest unique id, add one */
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+ list_for_each(tmp, &rbddev_list) {
+ struct rbd_device *rbd_dev;
+
+ rbd_dev = list_entry(tmp, struct rbd_device, node);
+ if (rbd_dev->id >= new_id)
+ new_id = rbd_dev->id + 1;
+ }
+
+ rbd_dev->id = new_id;
+
+ /* add to global list */
+ list_add_tail(&rbd_dev->node, &rbddev_list);
+
+ /* parse add command */
+ if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
+ "%" __stringify(RBD_MAX_OPT_LEN) "s "
+ "%d %" __stringify(RBD_MAX_OBJ_NAME_SIZE) "s",
+ mon_dev_name, opt, &rbd_dev->poolid,
+ rbd_dev->obj) != 4) {
+ rc = -EINVAL;
+ goto err_out_slot;
+ }
+
+ rbd_dev->obj_len = strlen(rbd_dev->obj);
+
+ /* initialize rest of new object */
+ snprintf(rbd_dev->name, DEV_NAME_LEN, "%d", rbd_dev->id);
+ rc = rbd_get_client(rbd_dev, mon_dev_name, opt);
+ if (rc < 0)
+ goto err_out_slot;
+
+ mutex_unlock(&ctl_mutex);
+
+ /* register our block device */
+ irc = register_blkdev(0, rbd_dev->name);
+ if (irc < 0) {
+ rc = irc;
+ goto err_out_slot;
+ }
+ rbd_dev->major = irc;
+
+ /* set up and announce blkdev mapping */
+ rc = rbd_init_disk(rbd_dev);
+ if (rc)
+ goto err_out_blkdev;
+
+ return count;
+
+err_out_blkdev:
+ unregister_blkdev(rbd_dev->major, rbd_dev->name);
+err_out_slot:
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+ list_del_init(&rbd_dev->node);
+ mutex_unlock(&ctl_mutex);
+
+ kfree(rbd_dev);
+err_out_opt:
+ kfree(opt);
+err_mon_dev:
+ kfree(mon_dev_name);
+err_out_mod:
+ dout("Error adding device %s\n", buf);
+ module_put(THIS_MODULE);
+ return rc;
+}
+
+static ssize_t class_rbd_remove(struct class *c,
+ struct class_attribute *attr,
+ const char *buf,
+ size_t count)
+{
+ struct rbd_device *rbd_dev = NULL;
+ int target_id, rc;
+ unsigned long ul;
+ struct list_head *tmp;
+
+ rc = strict_strtoul(buf, 10, &ul);
+ if (rc)
+ return rc;
+
+ /* convert to int; abort if we lost anything in the conversion */
+ target_id = (int) ul;
+ if (target_id != ul)
+ return -EINVAL;
+
+ /* remove object from list immediately */
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+ list_for_each(tmp, &rbddev_list) {
+ rbd_dev = list_entry(tmp, struct rbd_device, node);
+ if (rbd_dev->id == target_id) {
+ list_del_init(&rbd_dev->node);
+ break;
+ }
+ rbd_dev = NULL;
+ }
+
+ mutex_unlock(&ctl_mutex);
+
+ if (!rbd_dev)
+ return -ENOENT;
+
+ rbd_put_client(rbd_dev);
+
+ /* clean up and free blkdev and associated OSD connection */
+ rbd_free_disk(rbd_dev);
+ unregister_blkdev(rbd_dev->major, rbd_dev->name);
+ kfree(rbd_dev);
+
+ /* release module ref */
+ module_put(THIS_MODULE);
+
+ return count;
+}
+
+static struct class_attribute class_rbd_attrs[] = {
+ __ATTR(add, 0200, NULL, class_rbd_add),
+ __ATTR(remove, 0200, NULL, class_rbd_remove),
+ __ATTR(list, 0444, class_rbd_list, NULL),
+ __ATTR_NULL
+};
+
+/*
+ * create control files in sysfs
+ * /sys/class/rbd/...
+ */
+static int rbd_sysfs_init(void)
+{
+ int ret = -ENOMEM;
+
+ class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL);
+ if (!class_rbd)
+ goto out;
+
+ class_rbd->name = DRV_NAME;
+ class_rbd->owner = THIS_MODULE;
+ class_rbd->class_release = class_rbd_release;
+ class_rbd->class_attrs = class_rbd_attrs;
+
+ ret = class_register(class_rbd);
+ if (ret)
+ goto out_class;
+ return 0;
+
+out_class:
+ kfree(class_rbd);
+ class_rbd = NULL;
+ pr_err(DRV_NAME ": failed to create class rbd\n");
+out:
+ return ret;
+}
+
+static void rbd_sysfs_cleanup(void)
+{
+ if (class_rbd)
+ class_destroy(class_rbd);
+ class_rbd = NULL;
+}
+
+int __init rbd_init(void)
+{
+ int rc;
+
+ rc = rbd_sysfs_init();
+ if (rc)
+ return rc;
+ spin_lock_init(&node_lock);
+ pr_info("loaded " DRV_NAME_LONG);
+ return 0;
+}
+
+void __exit rbd_exit(void)
+{
+ rbd_sysfs_cleanup();
+}
diff --git a/fs/ceph/rbd.h b/fs/ceph/rbd.h
new file mode 100644
index 0000000..68e0a5c
--- /dev/null
+++ b/fs/ceph/rbd.h
@@ -0,0 +1,8 @@
+#ifndef _FS_CEPH_RBD
+#define _FS_CEPH_RBD
+
+extern void rbd_set_osdc(struct ceph_osd_client *o);
+extern int __init rbd_init(void);
+extern void __exit rbd_exit(void);
+
+#endif
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 52e9e86..9a73b25 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -17,6 +17,7 @@
#include "super.h"
#include "mon_client.h"
#include "auth.h"
+#include "rbd.h"

/*
* Ceph superblock operations
@@ -1079,8 +1080,14 @@ static int __init init_ceph(void)
pr_info("loaded %d.%d.%d (mon/mds/osd proto %d/%d/%d)\n",
CEPH_VERSION_MAJOR, CEPH_VERSION_MINOR, CEPH_VERSION_PATCH,
CEPH_MONC_PROTOCOL, CEPH_MDSC_PROTOCOL, CEPH_OSDC_PROTOCOL);
+
+ ret = rbd_init();
+ if (ret)
+ goto out_fs;
return 0;

+out_fs:
+ unregister_filesystem(&ceph_fs_type);
out_icache:
destroy_caches();
out_msgr:
@@ -1094,6 +1101,7 @@ out:
static void __exit exit_ceph(void)
{
dout("exit_ceph\n");
+ rbd_exit();
unregister_filesystem(&ceph_fs_type);
ceph_caps_finalize();
destroy_caches();
--
1.5.6.5

2010-04-13 23:52:51

by Yehuda Sadeh

[permalink] [raw]
Subject: [PATCH 2/6] ceph: refactor osdc requests creation functions

The osd requests creation are being decoupled from the
vino parameter, allowing clients using the osd to use
other arbitrary object names that are not necessarily
vino based.

Signed-off-by: Yehuda Sadeh <[email protected]>
---
fs/ceph/osd_client.c | 179 +++++++++++++++++++++++++++++++++++---------------
fs/ceph/osd_client.h | 24 +++++++
2 files changed, 149 insertions(+), 54 deletions(-)

diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 356e61a..5927bc3 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -22,6 +22,32 @@ static int __kick_requests(struct ceph_osd_client *osdc,

static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);

+void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+ struct ceph_file_layout *layout,
+ u64 off, u64 len, u64 *bno,
+ struct ceph_osd_request *req)
+{
+ struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
+ struct ceph_osd_op *op = (void *)(reqhead + 1);
+ u64 orig_len = len;
+ u64 objoff, objlen; /* extent in object */
+
+ /* object extent? */
+ ceph_calc_file_object_mapping(layout, off, &len, bno,
+ &objoff, &objlen);
+ if (len < orig_len)
+ dout(" skipping last %llu, final file extent %llu~%llu\n",
+ orig_len - len, off, len);
+
+ op->extent.offset = cpu_to_le64(objoff);
+ op->extent.length = cpu_to_le64(objlen);
+ req->r_num_pages = calc_pages_for(off, len);
+
+ dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
+ *bno, objoff, objlen, req->r_num_pages);
+
+}
+
/*
* Implement client access to distributed object storage cluster.
*
@@ -53,29 +79,13 @@ static void calc_layout(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
- struct ceph_osd_op *op = (void *)(reqhead + 1);
- u64 orig_len = *plen;
- u64 objoff, objlen; /* extent in object */
u64 bno;

reqhead->snapid = cpu_to_le64(vino.snap);
-
- /* object extent? */
- ceph_calc_file_object_mapping(layout, off, plen, &bno,
- &objoff, &objlen);
- if (*plen < orig_len)
- dout(" skipping last %llu, final file extent %llu~%llu\n",
- orig_len - *plen, off, *plen);
+ ceph_calc_raw_layout(osdc, layout, off, *plen, &bno, req);

sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
req->r_oid_len = strlen(req->r_oid);
-
- op->extent.offset = cpu_to_le64(objoff);
- op->extent.length = cpu_to_le64(objlen);
- req->r_num_pages = calc_pages_for(off, *plen);
-
- dout("calc_layout %s (%d) %llu~%llu (%d pages)\n",
- req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
}

/*
@@ -108,43 +118,34 @@ void ceph_osdc_release_request(struct kref *kref)
kfree(req);
}

-/*
- * build new request AND message, calculate layout, and adjust file
- * extent as needed.
- *
- * if the file was recently truncated, we include information about its
- * old and new size so that the object can be updated appropriately. (we
- * avoid synchronously deleting truncated objects because it's slow.)
- *
- * if @do_sync, include a 'startsync' command so that the osd will flush
- * data quickly.
- */
-struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
- struct ceph_file_layout *layout,
- struct ceph_vino vino,
- u64 off, u64 *plen,
- int opcode, int flags,
+struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
+ int flags,
struct ceph_snap_context *snapc,
int do_sync,
- u32 truncate_seq,
- u64 truncate_size,
- struct timespec *mtime,
- bool use_mempool, int num_reply)
+ bool use_mempool,
+ gfp_t gfp_flags,
+ struct page **pages)
{
struct ceph_osd_request *req;
struct ceph_msg *msg;
- struct ceph_osd_request_head *head;
- struct ceph_osd_op *op;
- void *p;
int num_op = 1 + do_sync;
- size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
- int i;
+ size_t msg_size = sizeof(struct ceph_osd_request_head) +
+ num_op*sizeof(struct ceph_osd_op);
+
+ if (use_mempool) {
+ req = mempool_alloc(osdc->req_mempool, gfp_flags);
+ memset(req, 0, sizeof(*req));
+ } else {
+ req = kzalloc(sizeof(*req), gfp_flags);
+ }
+ if (!req)
+ return NULL;

if (use_mempool) {
- req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
+ req = mempool_alloc(osdc->req_mempool, gfp_flags);
memset(req, 0, sizeof(*req));
} else {
- req = kzalloc(sizeof(*req), GFP_NOFS);
+ req = kzalloc(sizeof(*req), gfp_flags);
}
if (req == NULL)
return NULL;
@@ -164,7 +165,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
else
msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
- OSD_OPREPLY_FRONT_LEN, GFP_NOFS);
+ OSD_OPREPLY_FRONT_LEN, gfp_flags);
if (!msg) {
ceph_osdc_put_request(req);
return NULL;
@@ -178,18 +179,48 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
else
- msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS);
+ msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
if (!msg) {
ceph_osdc_put_request(req);
return NULL;
}
msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
memset(msg->front.iov_base, 0, msg->front.iov_len);
+
+ req->r_request = msg;
+ req->r_pages = pages;
+
+ return req;
+}
+
+/*
+ * build new request AND message
+ *
+ */
+void ceph_osdc_build_request(struct ceph_osd_request *req,
+ u64 off, u64 *plen,
+ int opcode,
+ struct ceph_snap_context *snapc,
+ int do_sync,
+ u32 truncate_seq,
+ u64 truncate_size,
+ struct timespec *mtime,
+ const char *oid,
+ int oid_len)
+{
+ struct ceph_msg *msg = req->r_request;
+ struct ceph_osd_request_head *head;
+ struct ceph_osd_op *op;
+ void *p;
+ int num_op = 1 + do_sync;
+ size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
+ int i;
+ int flags = req->r_flags;
+
head = msg->front.iov_base;
op = (void *)(head + 1);
p = (void *)(op + num_op);

- req->r_request = msg;
req->r_snapc = ceph_get_snap_context(snapc);

head->client_inc = cpu_to_le32(1); /* always, for now. */
@@ -199,10 +230,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
head->num_ops = cpu_to_le16(num_op);
op->op = cpu_to_le16(opcode);

- /* calculate max write size */
- calc_layout(osdc, vino, layout, off, plen, req);
- req->r_file_layout = *layout; /* keep a copy */
-
if (flags & CEPH_OSD_FLAG_WRITE) {
req->r_request->hdr.data_off = cpu_to_le16(off);
req->r_request->hdr.data_len = cpu_to_le32(*plen);
@@ -212,9 +239,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
op->extent.truncate_seq = cpu_to_le32(truncate_seq);

/* fill in oid */
- head->object_len = cpu_to_le32(req->r_oid_len);
- memcpy(p, req->r_oid, req->r_oid_len);
- p += req->r_oid_len;
+ head->object_len = cpu_to_le32(oid_len);
+ memcpy(p, oid, oid_len);
+ p += oid_len;

if (do_sync) {
op++;
@@ -233,6 +260,50 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
msg_size = p - msg->front.iov_base;
msg->front.iov_len = msg_size;
msg->hdr.front_len = cpu_to_le32(msg_size);
+ return;
+}
+
+/*
+ * build new request AND message, calculate layout, and adjust file
+ * extent as needed.
+ *
+ * if the file was recently truncated, we include information about its
+ * old and new size so that the object can be updated appropriately. (we
+ * avoid synchronously deleting truncated objects because it's slow.)
+ *
+ * if @do_sync, include a 'startsync' command so that the osd will flush
+ * data quickly.
+ */
+struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
+ struct ceph_file_layout *layout,
+ struct ceph_vino vino,
+ u64 off, u64 *plen,
+ int opcode, int flags,
+ struct ceph_snap_context *snapc,
+ int do_sync,
+ u32 truncate_seq,
+ u64 truncate_size,
+ struct timespec *mtime,
+ bool use_mempool, int num_reply)
+{
+ struct ceph_osd_request *req =
+ ceph_osdc_alloc_request(osdc, flags,
+ snapc, do_sync,
+ use_mempool,
+ GFP_NOFS, NULL);
+ if (IS_ERR(req))
+ return req;
+
+ /* calculate max write size */
+ calc_layout(osdc, vino, layout, off, plen, req);
+ req->r_file_layout = *layout; /* keep a copy */
+
+ ceph_osdc_build_request(req, off, plen, opcode,
+ snapc, do_sync,
+ truncate_seq, truncate_size,
+ mtime,
+ req->r_oid, req->r_oid_len);
+
return req;
}

diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index b075991..0b6b697 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -118,6 +118,30 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
struct ceph_msg *msg);

+extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+ struct ceph_file_layout *layout,
+ u64 off, u64 len, u64 *bno,
+ struct ceph_osd_request *req);
+
+extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
+ int flags,
+ struct ceph_snap_context *snapc,
+ int do_sync,
+ bool use_mempool,
+ gfp_t gfp_flags,
+ struct page **pages);
+
+extern void ceph_osdc_build_request(struct ceph_osd_request *req,
+ u64 off, u64 *plen,
+ int opcode,
+ struct ceph_snap_context *snapc,
+ int do_sync,
+ u32 truncate_seq,
+ u64 truncate_size,
+ struct timespec *mtime,
+ const char *oid,
+ int oid_len);
+
extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
struct ceph_file_layout *layout,
struct ceph_vino vino,
--
1.5.6.5

2010-04-13 23:53:08

by Yehuda Sadeh

[permalink] [raw]
Subject: [PATCH 1/6] ceph: all allocation functions should get gfp_mask

This is essential, as for the rados block device we'll need
to run in different contexts that would need flags that
are other than GFP_NOFS.

Signed-off-by: Yehuda Sadeh <[email protected]>
---
fs/ceph/caps.c | 2 +-
fs/ceph/file.c | 10 +++++-----
fs/ceph/mds_client.c | 11 ++++++-----
fs/ceph/messenger.c | 10 +++++-----
fs/ceph/messenger.h | 2 +-
fs/ceph/mon_client.c | 15 ++++++++-------
fs/ceph/msgpool.c | 4 ++--
fs/ceph/osd_client.c | 8 ++++----
8 files changed, 32 insertions(+), 30 deletions(-)

diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 2dd11d4..7355a39 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -932,7 +932,7 @@ static int send_cap_msg(struct ceph_mds_session *session,
seq, issue_seq, mseq, follows, size, max_size,
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);

- msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc));
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS);
if (!msg)
return -ENOMEM;

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 4024a5d..ef8f9e9 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -316,16 +316,16 @@ void ceph_release_page_vector(struct page **pages, int num_pages)
/*
* allocate a vector new pages
*/
-static struct page **alloc_page_vector(int num_pages)
+struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags)
{
struct page **pages;
int i;

- pages = kmalloc(sizeof(*pages) * num_pages, GFP_NOFS);
+ pages = kmalloc(sizeof(*pages) * num_pages, flags);
if (!pages)
return ERR_PTR(-ENOMEM);
for (i = 0; i < num_pages; i++) {
- pages[i] = __page_cache_alloc(GFP_NOFS);
+ pages[i] = __page_cache_alloc(flags);
if (pages[i] == NULL) {
ceph_release_page_vector(pages, i);
return ERR_PTR(-ENOMEM);
@@ -539,7 +539,7 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data,
* in sequence.
*/
} else {
- pages = alloc_page_vector(num_pages);
+ pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
}
if (IS_ERR(pages))
return PTR_ERR(pages);
@@ -666,7 +666,7 @@ more:
*/
truncate_inode_pages_range(inode->i_mapping, pos, pos+len);
} else {
- pages = alloc_page_vector(num_pages);
+ pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
if (IS_ERR(pages)) {
ret = PTR_ERR(pages);
goto out;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 5de7c83..9ebdda9 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -664,7 +664,7 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
struct ceph_msg *msg;
struct ceph_mds_session_head *h;

- msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h));
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
if (!msg) {
pr_err("create_session_msg ENOMEM creating msg\n");
return NULL;
@@ -1049,7 +1049,8 @@ static int add_cap_releases(struct ceph_mds_client *mdsc,

while (session->s_num_cap_releases < session->s_nr_caps + extra) {
spin_unlock(&session->s_cap_lock);
- msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE);
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
+ GFP_NOFS);
if (!msg)
goto out_unlocked;
dout("add_cap_releases %p msg %p now %d\n", session, msg,
@@ -1414,7 +1415,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
if (req->r_old_dentry_drop)
len += req->r_old_dentry->d_name.len;

- msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len);
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
if (!msg) {
msg = ERR_PTR(-ENOMEM);
goto out_free2;
@@ -2138,7 +2139,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
ceph_pagelist_init(pagelist);

err = -ENOMEM;
- reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0);
+ reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
if (!reply)
goto fail_nomsg;

@@ -2443,7 +2444,7 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
dnamelen = dentry->d_name.len;
len += dnamelen;

- msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len);
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
if (!msg)
return;
lease = msg->front.iov_base;
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 15dce2c..b6cff8e 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -2043,11 +2043,11 @@ void ceph_con_keepalive(struct ceph_connection *con)
* construct a new message with given type, size
* the new msg has a ref count of 1.
*/
-struct ceph_msg *ceph_msg_new(int type, int front_len)
+struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
{
struct ceph_msg *m;

- m = kmalloc(sizeof(*m), GFP_NOFS);
+ m = kmalloc(sizeof(*m), flags);
if (m == NULL)
goto out;
kref_init(&m->kref);
@@ -2070,11 +2070,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len)
/* front */
if (front_len) {
if (front_len > PAGE_CACHE_SIZE) {
- m->front.iov_base = __vmalloc(front_len, GFP_NOFS,
+ m->front.iov_base = __vmalloc(front_len, flags,
PAGE_KERNEL);
m->front_is_vmalloc = true;
} else {
- m->front.iov_base = kmalloc(front_len, GFP_NOFS);
+ m->front.iov_base = kmalloc(front_len, flags);
}
if (m->front.iov_base == NULL) {
pr_err("msg_new can't allocate %d bytes\n",
@@ -2149,7 +2149,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
}
if (!msg) {
*skip = 0;
- msg = ceph_msg_new(type, front_len);
+ msg = ceph_msg_new(type, front_len, GFP_NOFS);
if (!msg) {
pr_err("unable to allocate msg type %d len %d\n",
type, front_len);
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 9f0a540..fe24e6f 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -232,7 +232,7 @@ extern void ceph_con_keepalive(struct ceph_connection *con);
extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
extern void ceph_con_put(struct ceph_connection *con);

-extern struct ceph_msg *ceph_msg_new(int type, int front_len);
+extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags);
extern void ceph_msg_kfree(struct ceph_msg *m);


diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index fdc2800..455c973 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -190,7 +190,7 @@ static void __send_subscribe(struct ceph_mon_client *monc)
struct ceph_mon_subscribe_item *i;
void *p, *end;

- msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96);
+ msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS);
if (!msg)
return;

@@ -490,10 +490,10 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
init_completion(&req->completion);

err = -ENOMEM;
- req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h));
+ req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS);
if (!req->request)
goto out;
- req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024);
+ req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS);
if (!req->reply)
goto out;

@@ -632,15 +632,16 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
/* msg pools */
err = -ENOMEM;
monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
- sizeof(struct ceph_mon_subscribe_ack));
+ sizeof(struct ceph_mon_subscribe_ack),
+ GFP_NOFS);
if (!monc->m_subscribe_ack)
goto out_monmap;

- monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096);
+ monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS);
if (!monc->m_auth_reply)
goto out_subscribe_ack;

- monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096);
+ monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS);
monc->pending_auth = 0;
if (!monc->m_auth)
goto out_auth_reply;
@@ -815,7 +816,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_MON_MAP:
case CEPH_MSG_MDS_MAP:
case CEPH_MSG_OSD_MAP:
- m = ceph_msg_new(type, front_len);
+ m = ceph_msg_new(type, front_len, GFP_NOFS);
break;
}

diff --git a/fs/ceph/msgpool.c b/fs/ceph/msgpool.c
index 10d3632..b0cc104 100644
--- a/fs/ceph/msgpool.c
+++ b/fs/ceph/msgpool.c
@@ -11,7 +11,7 @@ static void *alloc_fn(gfp_t gfp_mask, void *arg)
{
struct ceph_msgpool *pool = arg;

- return ceph_msg_new(0, pool->front_len);
+ return ceph_msg_new(0, pool->front_len, gfp_mask);
}

static void free_fn(void *element, void *arg)
@@ -43,7 +43,7 @@ struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool,
WARN_ON(1);

/* try to alloc a fresh message */
- return ceph_msg_new(0, front_len);
+ return ceph_msg_new(0, front_len, GFP_NOFS);
}

return mempool_alloc(pool->pool, GFP_NOFS);
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index e69abe5..356e61a 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -164,7 +164,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
else
msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
- OSD_OPREPLY_FRONT_LEN);
+ OSD_OPREPLY_FRONT_LEN, GFP_NOFS);
if (!msg) {
ceph_osdc_put_request(req);
return NULL;
@@ -178,7 +178,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
else
- msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size);
+ msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS);
if (!msg) {
ceph_osdc_put_request(req);
return NULL;
@@ -1379,7 +1379,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
if (front > req->r_reply->front.iov_len) {
pr_warning("get_reply front %d > preallocated %d\n",
front, (int)req->r_reply->front.iov_len);
- m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front);
+ m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
if (!m)
goto out;
ceph_msg_put(req->r_reply);
@@ -1422,7 +1422,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,

switch (type) {
case CEPH_MSG_OSD_MAP:
- return ceph_msg_new(type, front);
+ return ceph_msg_new(type, front, GFP_NOFS);
case CEPH_MSG_OSD_OPREPLY:
return get_reply(con, hdr, skip);
default:
--
1.5.6.5

2010-04-13 23:53:32

by Yehuda Sadeh

[permalink] [raw]
Subject: [PATCH 3/6] ceph: message layer can send/receive data in bio

This capability is being added so that we wouldn't
need to copy the data from the rados block device.

Signed-off-by: Yehuda Sadeh <[email protected]>
---
fs/ceph/messenger.c | 175 +++++++++++++++++++++++++++++++++++++++++--------
fs/ceph/messenger.h | 3 +
fs/ceph/osd_client.c | 14 ++++-
fs/ceph/osd_client.h | 4 +-
4 files changed, 164 insertions(+), 32 deletions(-)

diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index b6cff8e..3f77210 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -8,6 +8,8 @@
#include <linux/net.h>
#include <linux/socket.h>
#include <linux/string.h>
+#include <linux/bio.h>
+#include <linux/blkdev.h>
#include <net/tcp.h>

#include "super.h"
@@ -529,8 +531,11 @@ static void prepare_write_message(struct ceph_connection *con)
if (le32_to_cpu(m->hdr.data_len) > 0) {
/* initialize page iterator */
con->out_msg_pos.page = 0;
- con->out_msg_pos.page_pos =
- le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+ if (m->pages)
+ con->out_msg_pos.page_pos =
+ le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+ else
+ con->out_msg_pos.page_pos = 0;
con->out_msg_pos.data_pos = 0;
con->out_msg_pos.did_page_crc = 0;
con->out_more = 1; /* data + footer will follow */
@@ -712,6 +717,29 @@ out:
return ret; /* done! */
}

+static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+{
+ if (!bio) {
+ *iter = NULL;
+ *seg = 0;
+ return;
+ }
+ *iter = bio;
+ *seg = bio->bi_idx;
+}
+
+static void iter_bio_next(struct bio **bio_iter, int *seg)
+{
+ if (*bio_iter == NULL)
+ return;
+
+ BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+
+ (*seg)++;
+ if (*seg == (*bio_iter)->bi_vcnt)
+ init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+}
+
/*
* Write as much message data payload as we can. If we finish, queue
* up the footer.
@@ -726,14 +754,20 @@ static int write_partial_msg_pages(struct ceph_connection *con)
size_t len;
int crc = con->msgr->nocrc;
int ret;
+ int page_shift;

dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
con->out_msg_pos.page_pos);

- while (con->out_msg_pos.page < con->out_msg->nr_pages) {
+ if (msg->bio && !msg->bio_iter)
+ init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
+
+
+ while (data_len - con->out_msg_pos.data_pos > 0) {
struct page *page = NULL;
void *kaddr = NULL;
+ int max_write;

/*
* if we are calculating the data crc (the default), we need
@@ -744,18 +778,29 @@ static int write_partial_msg_pages(struct ceph_connection *con)
page = msg->pages[con->out_msg_pos.page];
if (crc)
kaddr = kmap(page);
+ max_write = PAGE_SIZE;
} else if (msg->pagelist) {
page = list_first_entry(&msg->pagelist->head,
struct page, lru);
if (crc)
kaddr = kmap(page);
+ max_write = PAGE_SIZE;
+ } else if (msg->bio) {
+ struct bio_vec *bv;
+
+ bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
+ page = bv->bv_page;
+ page_shift = bv->bv_offset;
+ if (crc)
+ kaddr = kmap(page) + page_shift;
+ max_write = bv->bv_len;
} else {
page = con->msgr->zero_page;
if (crc)
kaddr = page_address(con->msgr->zero_page);
}
- len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
- (int)(data_len - con->out_msg_pos.data_pos));
+ len = min_t(int, max_write - con->out_msg_pos.page_pos,
+ data_len - con->out_msg_pos.data_pos);
if (crc && !con->out_msg_pos.did_page_crc) {
void *base = kaddr + con->out_msg_pos.page_pos;
u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
@@ -767,11 +812,12 @@ static int write_partial_msg_pages(struct ceph_connection *con)
}

ret = kernel_sendpage(con->sock, page,
- con->out_msg_pos.page_pos, len,
+ con->out_msg_pos.page_pos + page_shift,
+ len,
MSG_DONTWAIT | MSG_NOSIGNAL |
MSG_MORE);

- if (crc && (msg->pages || msg->pagelist))
+ if (crc && (msg->pages || msg->pagelist || msg->bio))
kunmap(page);

if (ret <= 0)
@@ -786,6 +832,8 @@ static int write_partial_msg_pages(struct ceph_connection *con)
if (msg->pagelist)
list_move_tail(&page->lru,
&msg->pagelist->head);
+ if (msg->bio)
+ iter_bio_next(&msg->bio_iter, &msg->bio_seg);
}
}

@@ -1291,8 +1339,7 @@ static int read_partial_message_section(struct ceph_connection *con,
struct kvec *section, unsigned int sec_len,
u32 *crc)
{
- int left;
- int ret;
+ int ret, left;

BUG_ON(!section);

@@ -1315,13 +1362,79 @@ static int read_partial_message_section(struct ceph_connection *con,
static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);
+
+
+static int read_partial_message_pages(struct ceph_connection *con, struct page **pages, unsigned data_len, int datacrc)
+{
+ void *p;
+ int ret;
+ int left;
+
+ left = min((int)(data_len - con->in_msg_pos.data_pos),
+ (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+ /* (page) data */
+ BUG_ON(pages == NULL);
+ p = kmap(pages[con->in_msg_pos.page]);
+ ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ left);
+ if (ret > 0 && datacrc)
+ con->in_data_crc =
+ crc32c(con->in_data_crc,
+ p + con->in_msg_pos.page_pos, ret);
+ kunmap(pages[con->in_msg_pos.page]);
+ if (ret <= 0)
+ return ret;
+ con->in_msg_pos.data_pos += ret;
+ con->in_msg_pos.page_pos += ret;
+ if (con->in_msg_pos.page_pos == PAGE_SIZE) {
+ con->in_msg_pos.page_pos = 0;
+ con->in_msg_pos.page++;
+ }
+
+ return ret;
+}
+
+static int read_partial_message_bio(struct ceph_connection *con,
+ struct bio **bio_iter, int *bio_seg,
+ unsigned data_len, int datacrc)
+{
+ struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
+ void *p;
+ int ret, left;
+
+ if (IS_ERR(bv))
+ return PTR_ERR(bv);
+
+ left = min((int)(data_len - con->in_msg_pos.data_pos),
+ (int)(bv->bv_len - con->in_msg_pos.page_pos));
+
+ p = kmap(bv->bv_page) + bv->bv_offset;
+
+ ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ left);
+ if (ret > 0 && datacrc)
+ con->in_data_crc =
+ crc32c(con->in_data_crc,
+ p + con->in_msg_pos.page_pos, ret);
+ kunmap(bv->bv_page);
+ if (ret <= 0)
+ return ret;
+ con->in_msg_pos.data_pos += ret;
+ con->in_msg_pos.page_pos += ret;
+ if (con->in_msg_pos.page_pos == bv->bv_len) {
+ con->in_msg_pos.page_pos = 0;
+ iter_bio_next(bio_iter, bio_seg);
+ }
+
+ return ret;
+}
+
/*
* read (part of) a message.
*/
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
- void *p;
int ret;
int to, left;
unsigned front_len, middle_len, data_len, data_off;
@@ -1385,7 +1498,10 @@ static int read_partial_message(struct ceph_connection *con)
m->middle->vec.iov_len = 0;

con->in_msg_pos.page = 0;
- con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+ if (m->pages)
+ con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+ else
+ con->in_msg_pos.page_pos = 0;
con->in_msg_pos.data_pos = 0;
}

@@ -1402,27 +1518,25 @@ static int read_partial_message(struct ceph_connection *con)
if (ret <= 0)
return ret;
}
+ if (m->bio && !m->bio_iter)
+ init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);

/* (page) data */
while (con->in_msg_pos.data_pos < data_len) {
- left = min((int)(data_len - con->in_msg_pos.data_pos),
- (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
- BUG_ON(m->pages == NULL);
- p = kmap(m->pages[con->in_msg_pos.page]);
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
- left);
- if (ret > 0 && datacrc)
- con->in_data_crc =
- crc32c(con->in_data_crc,
- p + con->in_msg_pos.page_pos, ret);
- kunmap(m->pages[con->in_msg_pos.page]);
- if (ret <= 0)
- return ret;
- con->in_msg_pos.data_pos += ret;
- con->in_msg_pos.page_pos += ret;
- if (con->in_msg_pos.page_pos == PAGE_SIZE) {
- con->in_msg_pos.page_pos = 0;
- con->in_msg_pos.page++;
+ if (m->pages) {
+ ret = read_partial_message_pages(con, m->pages,
+ data_len, datacrc);
+ if (ret <= 0)
+ return ret;
+ } else if (m->bio) {
+
+ ret = read_partial_message_bio(con,
+ &m->bio_iter, &m->bio_seg,
+ data_len, datacrc);
+ if (ret <= 0)
+ return ret;
+ } else {
+ BUG_ON(1);
}
}

@@ -2093,6 +2207,9 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
m->nr_pages = 0;
m->pages = NULL;
m->pagelist = NULL;
+ m->bio = NULL;
+ m->bio_iter = NULL;
+ m->bio_seg = 0;

dout("ceph_msg_new %p front %d\n", m, front_len);
return m;
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index fe24e6f..1f5bb9d 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -84,6 +84,9 @@ struct ceph_msg {
struct ceph_pagelist *pagelist; /* instead of pages */
struct list_head list_head;
struct kref kref;
+ struct bio *bio; /* instead of pages/pagelist */
+ struct bio *bio_iter; /* bio iterator */
+ int bio_seg; /* current bio segment */
bool front_is_vmalloc;
bool more_to_follow;
int front_max;
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 5927bc3..7469683 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -6,6 +6,7 @@
#include <linux/pagemap.h>
#include <linux/slab.h>
#include <linux/uaccess.h>
+#include <linux/bio.h>

#include "super.h"
#include "osd_client.h"
@@ -111,6 +112,8 @@ void ceph_osdc_release_request(struct kref *kref)
if (req->r_own_pages)
ceph_release_page_vector(req->r_pages,
req->r_num_pages);
+ if (req->r_bio)
+ bio_put(req->r_bio);
ceph_put_snap_context(req->r_snapc);
if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool);
@@ -124,7 +127,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
int do_sync,
bool use_mempool,
gfp_t gfp_flags,
- struct page **pages)
+ struct page **pages,
+ struct bio *bio)
{
struct ceph_osd_request *req;
struct ceph_msg *msg;
@@ -189,6 +193,10 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,

req->r_request = msg;
req->r_pages = pages;
+ if (bio) {
+ req->r_bio = bio;
+ bio_get(req->r_bio);
+ }

return req;
}
@@ -290,7 +298,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
ceph_osdc_alloc_request(osdc, flags,
snapc, do_sync,
use_mempool,
- GFP_NOFS, NULL);
+ GFP_NOFS, NULL, NULL);
if (IS_ERR(req))
return req;

@@ -1156,6 +1164,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,

req->r_request->pages = req->r_pages;
req->r_request->nr_pages = req->r_num_pages;
+ req->r_request->bio = req->r_bio;

register_request(osdc, req);

@@ -1472,6 +1481,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
}
m->pages = req->r_pages;
m->nr_pages = req->r_num_pages;
+ m->bio = req->r_bio;
}
*skip = 0;
req->r_con_filling_msg = ceph_con_get(con);
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 0b6b697..76aa63e 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -79,6 +79,7 @@ struct ceph_osd_request {
struct page **r_pages; /* pages for data payload */
int r_pages_from_pool;
int r_own_pages; /* if true, i own page list */
+ struct bio *r_bio; /* instead of pages */
};

struct ceph_osd_client {
@@ -129,7 +130,8 @@ extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *
int do_sync,
bool use_mempool,
gfp_t gfp_flags,
- struct page **pages);
+ struct page **pages,
+ struct bio *bio);

extern void ceph_osdc_build_request(struct ceph_osd_request *req,
u64 off, u64 *plen,
--
1.5.6.5

2010-04-14 09:57:56

by Andi Kleen

[permalink] [raw]
Subject: Re: [PATCH 5/6] ceph: refactor mount related functions, add helpers

Yehuda Sadeh <[email protected]> writes:
>
> +int ceph_copy_to_page_vector(struct page **pages,
> + const char *data,
> + loff_t off, size_t len)
> +{
> + int i = 0;
> + int po = off & ~PAGE_CACHE_MASK;
> + int left = len;
> + int l;
> +
> + while (left > 0) {
> + l = min_t(int, PAGE_CACHE_SIZE-po, left);

int seems like exactly the wrong type here.

-Andi

--
[email protected] -- Speaking for myself only.

2010-04-14 17:38:28

by Yehuda Sadeh

[permalink] [raw]
Subject: Re: [PATCH 5/6] ceph: refactor mount related functions, add helpers

On Wed, 2010-04-14 at 11:57 +0200, Andi Kleen wrote:
> Yehuda Sadeh <[email protected]> writes:
> >
> > +int ceph_copy_to_page_vector(struct page **pages,
> > + const char *data,
> > + loff_t off, size_t len)
> > +{
> > + int i = 0;
> > + int po = off & ~PAGE_CACHE_MASK;
> > + int left = len;
> > + int l;
> > +
> > + while (left > 0) {
> > + l = min_t(int, PAGE_CACHE_SIZE-po, left);
>
> int seems like exactly the wrong type here.

Ah yes.. there are other places around there that need fixing too.
Changing to size_t:

@@ -362,6 +362,52 @@ static int copy_user_to_page_vector(struct page
**pages,
return len;
}

+int ceph_copy_to_page_vector(struct page **pages,
+ const char *data,
+ loff_t off, size_t len)
+{
+ int i = 0;
+ size_t po = off & ~PAGE_CACHE_MASK;
+ size_t left = len;
+ size_t l;
+
+ while (left > 0) {
+ l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+ memcpy(page_address(pages[i]) + po, data, l);
+ data += l;
+ left -= l;
+ po += l;
+ if (po == PAGE_CACHE_SIZE) {
+ po = 0;
+ i++;
+ }
+ }
+ return len;
+}
+
+int ceph_copy_from_page_vector(struct page **pages,
+ char *data,
+ loff_t off, size_t len)
+{
+ int i = 0;
+ size_t po = off & ~PAGE_CACHE_MASK;
+ size_t left = len;
+ size_t l;
+
+ while (left > 0) {
+ l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+ memcpy(data, page_address(pages[i]) + po, l);
+ data += l;
+ left -= l;
+ po += l;
+ if (po == PAGE_CACHE_SIZE) {
+ po = 0;
+ i++;
+ }
+ }
+ return len;
+}
+
/*
* copy user data from a page vector into a user pointer
*/

Thanks,
Yehuda