This patch implements punch hole (fallocate) support for Ceph kernel
file system client.
We prepared two patches based on different kernel versions, one against
kernel 3.8-rc3, the other against the latest 3.10-rc5. It is because
unfortunately, we failed to set up a workable Ceph system with the client
based on the lastest code from Linux kernel git tree, for the server
side, we tried both the latest code from Ceph git tree and
the latest v0.61.3 release. The client will easily hang there without
any response, unless rebooting the machine.
We managed to set up a Ceph system with the client based on Linux
kernel 3.8-rc3 and the server based on Ceph v0.61.3, so the patch
against v3.8-rc3 has been under preliminary tests. However, the one
against v3.10-rc5 not.
Comments are appreciated.
This patch implements punch hole (fallocate) support against
Linux kernel 3.8-rc3.
Signed-off-by: Li Wang <[email protected]>
Signed-off-by: Yunchuan Wen <[email protected]>
---
fs/ceph/file.c | 248 +++++++++++++++++++++++++++++++++++++++++++++++++
net/ceph/osd_client.c | 17 +++-
2 files changed, 260 insertions(+), 5 deletions(-)
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index e51558f..7fb9c6d 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -7,6 +7,7 @@
#include <linux/mount.h>
#include <linux/namei.h>
#include <linux/writeback.h>
+#include <linux/falloc.h>
#include "super.h"
#include "mds_client.h"
@@ -848,6 +849,252 @@ out:
return offset;
}
+static inline void ceph_zero_partial_page(struct inode *inode, pgoff_t index, unsigned start, unsigned size)
+{
+ struct page *page;
+
+ page = find_lock_page(inode->i_mapping, index);
+ if (page) {
+ zero_user(page, start, size);
+ unlock_page(page);
+ page_cache_release(page);
+ }
+}
+
+static void ceph_truncate_and_zero_page_cache(struct inode *inode, loff_t offset, loff_t length)
+{
+ loff_t first_page;
+ loff_t last_page;
+ loff_t zero_len;
+
+ first_page =((offset + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT) << PAGE_CACHE_SHIFT;
+ last_page = ((offset + length) >> PAGE_CACHE_SHIFT) << PAGE_CACHE_SHIFT;
+ if (last_page > first_page) {
+ truncate_pagecache_range(inode, first_page, last_page - 1);
+ }
+ if (first_page > last_page) {
+ ceph_zero_partial_page(inode, offset >> PAGE_CACHE_SHIFT, offset & (PAGE_CACHE_SIZE - 1), length);
+ return;
+ }
+ /*
+ * zero out the partial page that contains
+ * the start of the hole
+ */
+ zero_len = first_page - offset;
+ if (zero_len > 0) {
+ ceph_zero_partial_page(inode, offset >> PAGE_CACHE_SHIFT, offset & (PAGE_CACHE_SIZE -1), zero_len);
+ }
+ /*
+ * zero out the partial page that contains
+ * the end of the hole
+ */
+ zero_len = offset + length - last_page;
+ if (zero_len > 0) {
+ ceph_zero_partial_page(inode, (offset + length) >> PAGE_CACHE_SHIFT, 0, zero_len);
+ }
+ /*
+ * If i_size is contained in the last page, we need to
+ * zero the partial page after i_size
+ */
+ if (inode->i_size >> PAGE_CACHE_SHIFT == (offset + length) >> PAGE_CACHE_SHIFT && inode->i_size % PAGE_CACHE_SIZE != 0) {
+ zero_len = PAGE_CACHE_SIZE -
+ (inode->i_size & (PAGE_CACHE_SIZE - 1));
+ if (zero_len > 0) {
+ ceph_zero_partial_page(inode, inode->i_size >> PAGE_CACHE_SHIFT, inode->i_size & (PAGE_CACHE_SIZE -1), zero_len);
+ }
+ }
+}
+
+static int ceph_delete_object_range(struct inode *inode, loff_t lstart, loff_t lend)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_osd_request *req;
+ u64 length = ceph_file_layout_object_size(ci->i_layout);
+ loff_t offset;
+ int ret = 0;
+
+ if (lstart > lend || length <= 0)
+ goto out;
+ for (offset = lstart; offset <= lend; offset += length) {
+ req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+ ceph_vino(inode), offset, &length,
+ CEPH_OSD_OP_DELETE, CEPH_OSD_FLAG_ONDISK,
+ NULL,
+ 0,
+ ci->i_truncate_seq, ci->i_truncate_size,
+ NULL, false, 1, 0);
+ if (IS_ERR(req)) {
+ ret = PTR_ERR(req);
+ goto out;
+ }
+
+ ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+ if (!ret) {
+ ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+ }
+ ceph_osdc_put_request(req);
+ /* object deleted */
+ if (ret == -ENOENT)
+ ret = 0;
+ }
+
+ out:
+ return ret;
+}
+
+static int ceph_zero_partial_object(struct file *file, loff_t offset, loff_t length)
+{
+ struct ceph_file_info *fi = file->private_data;
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_osd_request *req;
+ struct timespec mtime = CURRENT_TIME;
+ int want, got = 0, ret = 0;
+
+ if (length <= 0)
+ goto out;
+
+
+ if (fi->fmode & CEPH_FILE_MODE_LAZY)
+ want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+ else
+ want = CEPH_CAP_FILE_BUFFER;
+
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, offset+length);
+ if (ret < 0)
+ goto out;
+ if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
+ ceph_put_cap_refs(ci, got);
+ ret = -EAGAIN;
+ goto out;
+ }
+ req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+ ceph_vino(inode), offset, &length,
+ CEPH_OSD_OP_ZERO, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+ NULL,
+ 0,
+ ci->i_truncate_seq, ci->i_truncate_size,
+ &mtime, false, 1, 0);
+ if (IS_ERR(req)) {
+ ret = PTR_ERR(req);
+ goto out;
+ }
+
+ ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+ if (!ret) {
+ ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+ }
+ ceph_osdc_put_request(req);
+ ceph_put_cap_refs(ci, got);
+
+ out:
+ return ret;
+}
+
+static int ceph_delete_and_zero_objects(struct file *file, loff_t offset, loff_t length)
+{
+ unsigned long first_object;
+ unsigned long last_object;
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ __s32 object_size;
+ __u32 object_shift;
+ loff_t zero_len;
+ int ret = 0;
+
+ if (!(object_size = ceph_file_layout_object_size(ci->i_layout)))
+ goto out;
+ if (object_size == 1) {
+ object_shift = 0;
+ } else {
+ for (object_shift = 0; ;object_shift++) {
+ if (2 << object_shift == object_size)
+ break;
+ }
+ object_shift++;
+ }
+
+ first_object =((offset + object_size - 1) >> object_shift) << object_shift;
+ last_object = ((offset + length) >> object_shift) << object_shift;
+ if (last_object > first_object) {
+ ret = ceph_delete_object_range(inode, first_object, last_object - 1);
+ if (ret)
+ goto out;
+ }
+ if (first_object > last_object) {
+ ret = ceph_zero_partial_object(file, offset, length);
+ goto out;
+ }
+ /*
+ * zero out the partial object that contains
+ * the start of the hole
+ */
+ zero_len = first_object - offset;
+ if (zero_len > 0) {
+ ret = ceph_zero_partial_object(file, offset, zero_len);
+ if (ret)
+ goto out;
+ }
+ /*
+ * zero out the partial object that contains
+ * the end of the hole
+ */
+ zero_len = offset + length - last_object;
+ if (zero_len > 0) {
+ ret = ceph_zero_partial_object(file, last_object, zero_len);
+ }
+
+ out:
+ return ret;
+}
+
+static int ceph_punch_hole(struct file *file, loff_t offset, loff_t length)
+{
+ struct inode *inode = file->f_dentry->d_inode;
+ int ret = 0;
+
+ if (!S_ISREG(inode->i_mode)) {
+ return -EOPNOTSUPP;
+ }
+ if (IS_SWAPFILE(inode)) {
+ return -ETXTBSY;
+ }
+ mutex_lock(&inode->i_mutex);
+
+ /* No need to punch hole beyond i_size */
+ if (offset >= inode->i_size)
+ goto out_unlock;
+
+ /*
+ * If the hole extends beyond i_size, set the hole
+ * to end after the page that contains i_size
+ */
+ if (offset + length > inode->i_size) {
+ length = inode->i_size +
+ PAGE_CACHE_SIZE - (inode->i_size & (PAGE_CACHE_SIZE - 1)) -
+ offset;
+ }
+
+ ceph_truncate_and_zero_page_cache(inode, offset, length);
+ ret = ceph_delete_and_zero_objects(file, offset, length);
+
+ out_unlock:
+ mutex_unlock(&inode->i_mutex);
+ return ret;
+}
+
+static long ceph_fallocate(struct file *file, int mode, loff_t offset, loff_t length)
+{
+ /* FALLOC_FL_PUNCH_HOLE must be used with FALLOC_FL_KEEP_SIZE */
+ if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
+ return -EOPNOTSUPP;
+ if (mode & FALLOC_FL_PUNCH_HOLE)
+ return ceph_punch_hole(file, offset, length);
+ return -EOPNOTSUPP;
+}
+
const struct file_operations ceph_file_fops = {
.open = ceph_open,
.release = ceph_release,
@@ -864,5 +1111,6 @@ const struct file_operations ceph_file_fops = {
.splice_write = generic_file_splice_write,
.unlocked_ioctl = ceph_ioctl,
.compat_ioctl = ceph_ioctl,
+ .fallocate = ceph_fallocate,
};
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index eb9a444..da69cfd 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -230,7 +230,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
req->r_flags = flags;
- WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
+ WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK)) == 0);
/* create reply message */
if (use_mempool)
@@ -291,14 +291,16 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
switch (src->op) {
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
- dst->extent.offset =
- cpu_to_le64(src->extent.offset);
- dst->extent.length =
- cpu_to_le64(src->extent.length);
dst->extent.truncate_size =
cpu_to_le64(src->extent.truncate_size);
dst->extent.truncate_seq =
cpu_to_le32(src->extent.truncate_seq);
+ case CEPH_OSD_OP_DELETE:
+ case CEPH_OSD_OP_ZERO:
+ dst->extent.length =
+ cpu_to_le64(src->extent.length);
+ dst->extent.offset =
+ cpu_to_le64(src->extent.offset);
break;
case CEPH_OSD_OP_GETXATTR:
@@ -471,6 +473,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
ops[0].extent.truncate_size = truncate_size;
ops[0].payload_len = 0;
+ if (opcode == CEPH_OSD_OP_ZERO || opcode == CEPH_OSD_OP_DELETE) {
+ ops[0].extent.offset = off;
+ ops[0].extent.length = *plen;
+ }
if (do_sync) {
ops[1].op = CEPH_OSD_OP_STARTSYNC;
ops[1].payload_len = 0;
@@ -1181,6 +1187,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
if (req == NULL) {
dout("handle_reply tid %llu dne\n", tid);
mutex_unlock(&osdc->request_mutex);
+ printk(KERN_INFO"handle pm\n");
return;
}
ceph_osdc_get_request(req);
--
1.7.9.5
This patch implements punch hole (fallocate) support against
Linux kernel 3.10-rc5.
Signed-off-by: Li Wang <[email protected]>
Signed-off-by: Yunchuan Wen <[email protected]>
---
fs/ceph/file.c | 245 +++++++++++++++++++++++++++++++++++++++++++++++++
net/ceph/osd_client.c | 8 +-
2 files changed, 251 insertions(+), 2 deletions(-)
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 656e169..e092b69 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -8,6 +8,7 @@
#include <linux/namei.h>
#include <linux/writeback.h>
#include <linux/aio.h>
+#include <linux/falloc.h>
#include "super.h"
#include "mds_client.h"
@@ -882,6 +883,249 @@ out:
return offset;
}
+static inline void ceph_zero_partial_page(struct inode *inode, pgoff_t index, unsigned start, unsigned size)
+{
+ struct page *page;
+
+ page = find_lock_page(inode->i_mapping, index);
+ if (page) {
+ zero_user(page, start, size);
+ unlock_page(page);
+ page_cache_release(page);
+ }
+}
+
+static void ceph_truncate_and_zero_page_cache(struct inode *inode, loff_t offset, loff_t length)
+{
+ loff_t first_page;
+ loff_t last_page;
+ loff_t zero_len;
+
+ first_page =((offset + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT) << PAGE_CACHE_SHIFT;
+ last_page = ((offset + length) >> PAGE_CACHE_SHIFT) << PAGE_CACHE_SHIFT;
+ if (last_page > first_page) {
+ truncate_pagecache_range(inode, first_page, last_page - 1);
+ }
+ if (first_page > last_page) {
+ ceph_zero_partial_page(inode, offset >> PAGE_CACHE_SHIFT, offset & (PAGE_CACHE_SIZE - 1), length);
+ return;
+ }
+ /*
+ * zero out the partial page that contains
+ * the start of the hole
+ */
+ zero_len = first_page - offset;
+ if (zero_len > 0) {
+ ceph_zero_partial_page(inode, offset >> PAGE_CACHE_SHIFT, offset & (PAGE_CACHE_SIZE -1), zero_len);
+ }
+ /*
+ * zero out the partial page that contains
+ * the end of the hole
+ */
+ zero_len = offset + length - last_page;
+ if (zero_len > 0) {
+ ceph_zero_partial_page(inode, (offset + length) >> PAGE_CACHE_SHIFT, 0, zero_len);
+ }
+ /*
+ * If i_size is contained in the last page, we need to
+ * zero the partial page after i_size
+ */
+ if (inode->i_size >> PAGE_CACHE_SHIFT == (offset + length) >> PAGE_CACHE_SHIFT && inode->i_size % PAGE_CACHE_SIZE != 0) {
+ zero_len = PAGE_CACHE_SIZE -
+ (inode->i_size & (PAGE_CACHE_SIZE - 1));
+ if (zero_len > 0) {
+ ceph_zero_partial_page(inode, inode->i_size >> PAGE_CACHE_SHIFT, inode->i_size & (PAGE_CACHE_SIZE -1), zero_len);
+ }
+ }
+}
+
+static int ceph_delete_object_range(struct inode *inode, loff_t lstart, loff_t lend)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_osd_request *req;
+ u64 length = ceph_file_layout_object_size(ci->i_layout);
+ loff_t offset;
+ int ret = 0;
+
+ if (lstart > lend || length <= 0)
+ goto out;
+ for (offset = lstart; offset <= lend; offset += length) {
+ req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+ ceph_vino(inode), offset, &length,
+ 1, CEPH_OSD_OP_DELETE, CEPH_OSD_FLAG_ONDISK,
+ NULL,
+ ci->i_truncate_seq, ci->i_truncate_size,
+ false);
+ if (IS_ERR(req)) {
+ ret = PTR_ERR(req);
+ goto out;
+ }
+
+ ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+ if (!ret) {
+ ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+ }
+ ceph_osdc_put_request(req);
+ /* object deleted */
+ if (ret == -ENOENT)
+ ret = 0;
+ }
+
+ out:
+ return ret;
+}
+
+static int ceph_zero_partial_object(struct file *file, loff_t offset, loff_t length)
+{
+ struct ceph_file_info *fi = file->private_data;
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_osd_request *req;
+ int want, got = 0, ret = 0;
+
+ if (length <= 0)
+ goto out;
+
+
+ if (fi->fmode & CEPH_FILE_MODE_LAZY)
+ want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+ else
+ want = CEPH_CAP_FILE_BUFFER;
+
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, offset+length);
+ if (ret < 0)
+ goto out;
+ if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
+ ceph_put_cap_refs(ci, got);
+ ret = -EAGAIN;
+ goto out;
+ }
+ req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+ ceph_vino(inode), offset, &length, 1,
+ CEPH_OSD_OP_ZERO, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+ NULL,
+ ci->i_truncate_seq, ci->i_truncate_size,
+ false);
+ if (IS_ERR(req)) {
+ ret = PTR_ERR(req);
+ goto out;
+ }
+
+ ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+ if (!ret) {
+ ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+ }
+ ceph_osdc_put_request(req);
+ ceph_put_cap_refs(ci, got);
+
+ out:
+ return ret;
+}
+
+static int ceph_delete_and_zero_objects(struct file *file, loff_t offset, loff_t length)
+{
+ unsigned long first_object;
+ unsigned long last_object;
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ __s32 object_size;
+ __u32 object_shift;
+ loff_t zero_len;
+ int ret = 0;
+
+ if (!(object_size = ceph_file_layout_object_size(ci->i_layout)))
+ goto out;
+ if (object_size == 1) {
+ object_shift = 0;
+ } else {
+ for (object_shift = 0; ;object_shift++) {
+ if (2 << object_shift == object_size)
+ break;
+ }
+ object_shift++;
+ }
+
+ first_object =((offset + object_size - 1) >> object_shift) << object_shift;
+ last_object = ((offset + length) >> object_shift) << object_shift;
+ if (last_object > first_object) {
+ ret = ceph_delete_object_range(inode, first_object, last_object - 1);
+ if (ret)
+ goto out;
+ }
+ if (first_object > last_object) {
+ ret = ceph_zero_partial_object(file, offset, length);
+ goto out;
+ }
+ /*
+ * zero out the partial object that contains
+ * the start of the hole
+ */
+ zero_len = first_object - offset;
+ if (zero_len > 0) {
+ ret = ceph_zero_partial_object(file, offset, zero_len);
+ if (ret)
+ goto out;
+ }
+ /*
+ * zero out the partial object that contains
+ * the end of the hole
+ */
+ zero_len = offset + length - last_object;
+ if (zero_len > 0) {
+ ret = ceph_zero_partial_object(file, last_object, zero_len);
+ }
+
+ out:
+ return ret;
+}
+
+static int ceph_punch_hole(struct file *file, loff_t offset, loff_t length)
+{
+ struct inode *inode = file->f_dentry->d_inode;
+ int ret = 0;
+
+ if (!S_ISREG(inode->i_mode)) {
+ return -EOPNOTSUPP;
+ }
+ if (IS_SWAPFILE(inode)) {
+ return -ETXTBSY;
+ }
+ mutex_lock(&inode->i_mutex);
+
+ /* No need to punch hole beyond i_size */
+ if (offset >= inode->i_size)
+ goto out_unlock;
+
+ /*
+ * If the hole extends beyond i_size, set the hole
+ * to end after the page that contains i_size
+ */
+ if (offset + length > inode->i_size) {
+ length = inode->i_size +
+ PAGE_CACHE_SIZE - (inode->i_size & (PAGE_CACHE_SIZE - 1)) -
+ offset;
+ }
+
+ ceph_truncate_and_zero_page_cache(inode, offset, length);
+ ret = ceph_delete_and_zero_objects(file, offset, length);
+
+ out_unlock:
+ mutex_unlock(&inode->i_mutex);
+ return ret;
+}
+
+static long ceph_fallocate(struct file *file, int mode, loff_t offset, loff_t length)
+{
+ /* FALLOC_FL_PUNCH_HOLE must be used with FALLOC_FL_KEEP_SIZE */
+ if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
+ return -EOPNOTSUPP;
+ if (mode & FALLOC_FL_PUNCH_HOLE)
+ return ceph_punch_hole(file, offset, length);
+ return -EOPNOTSUPP;
+}
+
const struct file_operations ceph_file_fops = {
.open = ceph_open,
.release = ceph_release,
@@ -898,5 +1142,6 @@ const struct file_operations ceph_file_fops = {
.splice_write = generic_file_splice_write,
.unlocked_ioctl = ceph_ioctl,
.compat_ioctl = ceph_ioctl,
+ .fallocate = ceph_fallocate,
};
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3a246a6..a6d9671 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -503,7 +503,8 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
size_t payload_len = 0;
- BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
+ BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
+ opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO);
op->extent.offset = offset;
op->extent.length = length;
@@ -631,6 +632,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
break;
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
+ case CEPH_OSD_OP_DELETE:
+ case CEPH_OSD_OP_ZERO:
if (src->op == CEPH_OSD_OP_WRITE)
request_data_len = src->extent.length;
dst->extent.offset = cpu_to_le64(src->extent.offset);
@@ -715,7 +718,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
u64 object_base;
int r;
- BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
+ BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
+ opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO);
req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
GFP_NOFS);
--
1.7.9.5
On Fri, 14 Jun 2013, Li Wang wrote:
> This patch implements punch hole (fallocate) support for Ceph kernel
> file system client.
> We prepared two patches based on different kernel versions, one against
> kernel 3.8-rc3, the other against the latest 3.10-rc5. It is because
> unfortunately, we failed to set up a workable Ceph system with the client
> based on the lastest code from Linux kernel git tree, for the server
> side, we tried both the latest code from Ceph git tree and
> the latest v0.61.3 release. The client will easily hang there without
> any response, unless rebooting the machine.
This is odd; was there anything in dmesg? We test this nightly (currently
on 3.10-rc5).
> We managed to set up a Ceph system with the client based on Linux
> kernel 3.8-rc3 and the server based on Ceph v0.61.3, so the patch
> against v3.8-rc3 has been under preliminary tests. However, the one
> against v3.10-rc5 not.
Do they differ substantially? The one against the latest kernel is what
we'll ultimately apply and merge.
Comments inline...
Thanks!
sage
On Fri, 14 Jun 2013, Li Wang wrote:
> This patch implements punch hole (fallocate) support against
> Linux kernel 3.10-rc5.
>
> Signed-off-by: Li Wang <[email protected]>
> Signed-off-by: Yunchuan Wen <[email protected]>
> ---
> fs/ceph/file.c | 245 +++++++++++++++++++++++++++++++++++++++++++++++++
> net/ceph/osd_client.c | 8 +-
> 2 files changed, 251 insertions(+), 2 deletions(-)
>
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 656e169..e092b69 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -8,6 +8,7 @@
> #include <linux/namei.h>
> #include <linux/writeback.h>
> #include <linux/aio.h>
> +#include <linux/falloc.h>
>
> #include "super.h"
> #include "mds_client.h"
> @@ -882,6 +883,249 @@ out:
> return offset;
> }
>
> +static inline void ceph_zero_partial_page(struct inode *inode, pgoff_t index, unsigned start, unsigned size)
> +{
> + struct page *page;
> +
> + page = find_lock_page(inode->i_mapping, index);
> + if (page) {
> + zero_user(page, start, size);
> + unlock_page(page);
> + page_cache_release(page);
> + }
> +}
> +
> +static void ceph_truncate_and_zero_page_cache(struct inode *inode, loff_t offset, loff_t length)
> +{
> + loff_t first_page;
> + loff_t last_page;
> + loff_t zero_len;
> +
> + first_page =((offset + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT) << PAGE_CACHE_SHIFT;
whitespace
> + last_page = ((offset + length) >> PAGE_CACHE_SHIFT) << PAGE_CACHE_SHIFT;
> + if (last_page > first_page) {
> + truncate_pagecache_range(inode, first_page, last_page - 1);
> + }
> + if (first_page > last_page) {
> + ceph_zero_partial_page(inode, offset >> PAGE_CACHE_SHIFT, offset & (PAGE_CACHE_SIZE - 1), length);
> + return;
> + }
> + /*
> + * zero out the partial page that contains
> + * the start of the hole
> + */
> + zero_len = first_page - offset;
here too
> + if (zero_len > 0) {
> + ceph_zero_partial_page(inode, offset >> PAGE_CACHE_SHIFT, offset & (PAGE_CACHE_SIZE -1), zero_len);
> + }
> + /*
> + * zero out the partial page that contains
> + * the end of the hole
> + */
> + zero_len = offset + length - last_page;
> + if (zero_len > 0) {
> + ceph_zero_partial_page(inode, (offset + length) >> PAGE_CACHE_SHIFT, 0, zero_len);
> + }
> + /*
> + * If i_size is contained in the last page, we need to
> + * zero the partial page after i_size
> + */
> + if (inode->i_size >> PAGE_CACHE_SHIFT == (offset + length) >> PAGE_CACHE_SHIFT && inode->i_size % PAGE_CACHE_SIZE != 0) {
> + zero_len = PAGE_CACHE_SIZE -
> + (inode->i_size & (PAGE_CACHE_SIZE - 1));
> + if (zero_len > 0) {
> + ceph_zero_partial_page(inode, inode->i_size >> PAGE_CACHE_SHIFT, inode->i_size & (PAGE_CACHE_SIZE -1), zero_len);
> + }
> + }
> +}
> +
> +static int ceph_delete_object_range(struct inode *inode, loff_t lstart, loff_t lend)
> +{
> + struct ceph_inode_info *ci = ceph_inode(inode);
> + struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> + struct ceph_osd_request *req;
> + u64 length = ceph_file_layout_object_size(ci->i_layout);
> + loff_t offset;
> + int ret = 0;
> +
> + if (lstart > lend || length <= 0)
> + goto out;
> + for (offset = lstart; offset <= lend; offset += length) {
> + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> + ceph_vino(inode), offset, &length,
> + 1, CEPH_OSD_OP_DELETE, CEPH_OSD_FLAG_ONDISK,
> + NULL,
> + ci->i_truncate_seq, ci->i_truncate_size,
> + false);
> + if (IS_ERR(req)) {
> + ret = PTR_ERR(req);
> + goto out;
> + }
One tweak here: for the very first object, we want to truncate it to 0
instead of deleting it. The object has some other metadata attached to
it (the mds's backtrace structure is stored there as an attr) and we don't
want to lose that.
> +
> + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> + if (!ret) {
> + ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> + }
> + ceph_osdc_put_request(req);
> + /* object deleted */
> + if (ret == -ENOENT)
> + ret = 0;
> + }
> +
> + out:
> + return ret;
> +}
> +
> +static int ceph_zero_partial_object(struct file *file, loff_t offset, loff_t length)
> +{
> + struct ceph_file_info *fi = file->private_data;
> + struct inode *inode = file->f_dentry->d_inode;
> + struct ceph_inode_info *ci = ceph_inode(inode);
> + struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> + struct ceph_osd_request *req;
> + int want, got = 0, ret = 0;
> +
> + if (length <= 0)
> + goto out;
> +
> +
> + if (fi->fmode & CEPH_FILE_MODE_LAZY)
> + want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
> + else
> + want = CEPH_CAP_FILE_BUFFER;
> +
> + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, offset+length);
> + if (ret < 0)
> + goto out;
> + if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
> + ceph_put_cap_refs(ci, got);
> + ret = -EAGAIN;
> + goto out;
> + }
I think we should do all of the cap checks in the outer caller, so that it
happens only once for the entire hole punch operation... not on every
object.
> + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> + ceph_vino(inode), offset, &length, 1,
> + CEPH_OSD_OP_ZERO, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
> + NULL,
> + ci->i_truncate_seq, ci->i_truncate_size,
> + false);
> + if (IS_ERR(req)) {
> + ret = PTR_ERR(req);
> + goto out;
> + }
> +
> + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> + if (!ret) {
> + ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> + }
> + ceph_osdc_put_request(req);
> + ceph_put_cap_refs(ci, got);
> +
> + out:
> + return ret;
> +}
> +
> +static int ceph_delete_and_zero_objects(struct file *file, loff_t offset, loff_t length)
> +{
> + unsigned long first_object;
> + unsigned long last_object;
> + struct inode *inode = file->f_dentry->d_inode;
> + struct ceph_inode_info *ci = ceph_inode(inode);
> + __s32 object_size;
> + __u32 object_shift;
> + loff_t zero_len;
> + int ret = 0;
> +
> + if (!(object_size = ceph_file_layout_object_size(ci->i_layout)))
> + goto out;
> + if (object_size == 1) {
> + object_shift = 0;
> + } else {
> + for (object_shift = 0; ;object_shift++) {
> + if (2 << object_shift == object_size)
> + break;
> + }
> + object_shift++;
> + }
Hmm, ok, here is where we run into a problem. The default striping
strategy is very simple: 4 MB objects. But the layout can support more
complicated layouts, like:
obj0 obj1 obj2 obj3 obj4 ...
0 1 8 9 16 ...
2 3 10 11 ...
4 5 12 13
6 7 14 15
which means that a hole punch (say, from 4-14) may end up truncating
several objects (obj0 and obj1) and zeroing ranges in several others (obj2
and obj3). The read/write path keep things simple by just writing a
stripe unit at a time. That's not efficient for these types of layouts,
but we haven't bothered to do anything more complicated since nobody
really uses these weird layouts. The hole punch needs to at least be
correct, however, even if it isn't efficient. Zeroing in pieces will get
the right result, but won't be storage efficient because you may zero
several pieces of an object instead of just deleting it. So this may be
the time to solve that particular problem.
At a high level, what we need is a way to map a file range onto a vector
of objects and ranges within those objects. We can do this in "period"
increments (where period is object_size * stripe_count bytes) so that the
array/vector sizes are known in advance ((object_size / stripe_unit) *
stripe_count cells). Probably a helper that calculates the mapping onto
objects, so that zero can zero several stripe units/cells at at once, or
remove entire objects.
Then in the future we can make the read/write path also make use of it.
For the IO case we also will need to know how file offsets map to object
offsets, but for the zero case that's not needed, so you could ignore it
fore now (eventually it should probably be an optional output
argument/pointer).
Does that make sense?
> +
> + first_object =((offset + object_size - 1) >> object_shift) << object_shift;
whitespace
> + last_object = ((offset + length) >> object_shift) << object_shift;
> + if (last_object > first_object) {
> + ret = ceph_delete_object_range(inode, first_object, last_object - 1);
> + if (ret)
> + goto out;
> + }
> + if (first_object > last_object) {
> + ret = ceph_zero_partial_object(file, offset, length);
> + goto out;
> + }
> + /*
> + * zero out the partial object that contains
> + * the start of the hole
> + */
> + zero_len = first_object - offset;
> + if (zero_len > 0) {
> + ret = ceph_zero_partial_object(file, offset, zero_len);
> + if (ret)
> + goto out;
> + }
> + /*
> + * zero out the partial object that contains
> + * the end of the hole
> + */
> + zero_len = offset + length - last_object;
> + if (zero_len > 0) {
> + ret = ceph_zero_partial_object(file, last_object, zero_len);
> + }
> +
> + out:
> + return ret;
> +}
> +
> +static int ceph_punch_hole(struct file *file, loff_t offset, loff_t length)
> +{
> + struct inode *inode = file->f_dentry->d_inode;
> + int ret = 0;
> +
> + if (!S_ISREG(inode->i_mode)) {
> + return -EOPNOTSUPP;
> + }
> + if (IS_SWAPFILE(inode)) {
> + return -ETXTBSY;
> + }
> + mutex_lock(&inode->i_mutex);
> +
> + /* No need to punch hole beyond i_size */
> + if (offset >= inode->i_size)
> + goto out_unlock;
> +
> + /*
> + * If the hole extends beyond i_size, set the hole
> + * to end after the page that contains i_size
> + */
> + if (offset + length > inode->i_size) {
> + length = inode->i_size +
> + PAGE_CACHE_SIZE - (inode->i_size & (PAGE_CACHE_SIZE - 1)) -
> + offset;
> + }
I think we should do teh caps stuff here.
> +
> + ceph_truncate_and_zero_page_cache(inode, offset, length);
> + ret = ceph_delete_and_zero_objects(file, offset, length);
> +
> + out_unlock:
> + mutex_unlock(&inode->i_mutex);
> + return ret;
> +}
> +
> +static long ceph_fallocate(struct file *file, int mode, loff_t offset, loff_t length)
> +{
> + /* FALLOC_FL_PUNCH_HOLE must be used with FALLOC_FL_KEEP_SIZE */
That is just to simplify the implementation, right?
> + if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
> + return -EOPNOTSUPP;
> + if (mode & FALLOC_FL_PUNCH_HOLE)
> + return ceph_punch_hole(file, offset, length);
> + return -EOPNOTSUPP;
> +}
> +
> const struct file_operations ceph_file_fops = {
> .open = ceph_open,
> .release = ceph_release,
> @@ -898,5 +1142,6 @@ const struct file_operations ceph_file_fops = {
> .splice_write = generic_file_splice_write,
> .unlocked_ioctl = ceph_ioctl,
> .compat_ioctl = ceph_ioctl,
> + .fallocate = ceph_fallocate,
> };
>
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 3a246a6..a6d9671 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -503,7 +503,8 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
> struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
> size_t payload_len = 0;
>
> - BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
> + BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
> + opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO);
>
> op->extent.offset = offset;
> op->extent.length = length;
> @@ -631,6 +632,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
> break;
> case CEPH_OSD_OP_READ:
> case CEPH_OSD_OP_WRITE:
> + case CEPH_OSD_OP_DELETE:
> + case CEPH_OSD_OP_ZERO:
> if (src->op == CEPH_OSD_OP_WRITE)
> request_data_len = src->extent.length;
> dst->extent.offset = cpu_to_le64(src->extent.offset);
> @@ -715,7 +718,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
> u64 object_base;
> int r;
>
> - BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
> + BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
> + opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO);
>
> req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
> GFP_NOFS);
> --
> 1.7.9.5
>
>
>
This patch implements punch hole (fallocate) support for Ceph.
Signed-off-by: Li Wang <[email protected]>
Signed-off-by: Yunchuan Wen <[email protected]>
---
fs/ceph/file.c | 313
+++++++++++++++++++++++++++++++++++++++++++++++++
net/ceph/osd_client.c | 8 +-
2 files changed, 319 insertions(+), 2 deletions(-)
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 656e169..578e5fd 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -8,6 +8,7 @@
#include <linux/namei.h>
#include <linux/writeback.h>
#include <linux/aio.h>
+#include <linux/falloc.h>
#include "super.h"
#include "mds_client.h"
@@ -882,6 +883,317 @@ out:
return offset;
}
+static inline void ceph_zero_partial_page(struct inode *inode, pgoff_t
index, unsigned start, unsigned size)
+{
+ struct page *page;
+
+ page = find_lock_page(inode->i_mapping, index);
+ if (page) {
+ zero_user(page, start, size);
+ unlock_page(page);
+ page_cache_release(page);
+ }
+}
+
+static void ceph_truncate_and_zero_page_cache(struct inode *inode,
loff_t offset, loff_t length)
+{
+ loff_t first_page;
+ loff_t last_page;
+ loff_t zero_len;
+
+ first_page =((offset + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT) <<
PAGE_CACHE_SHIFT;
+ last_page = ((offset + length) >> PAGE_CACHE_SHIFT) << PAGE_CACHE_SHIFT;
+ if (last_page > first_page) {
+ truncate_pagecache_range(inode, first_page, last_page - 1);
+ }
+ if (first_page > last_page) {
+ ceph_zero_partial_page(inode, offset >> PAGE_CACHE_SHIFT, offset &
(PAGE_CACHE_SIZE - 1), length);
+ return;
+ }
+ /*
+ * zero out the partial page that contains
+ * the start of the hole
+ */
+ zero_len = first_page - offset;
+ if (zero_len > 0) {
+ ceph_zero_partial_page(inode, offset >> PAGE_CACHE_SHIFT, offset &
(PAGE_CACHE_SIZE -1), zero_len);
+ }
+ /*
+ * zero out the partial page that contains
+ * the end of the hole
+ */
+ zero_len = offset + length - last_page;
+ if (zero_len > 0) {
+ ceph_zero_partial_page(inode, (offset + length) >> PAGE_CACHE_SHIFT,
0, zero_len);
+ }
+ /*
+ * If i_size is contained in the last page, we need to
+ * zero the partial page after i_size
+ */
+ if (inode->i_size >> PAGE_CACHE_SHIFT == (offset + length) >>
PAGE_CACHE_SHIFT && inode->i_size % PAGE_CACHE_SIZE != 0) {
+ zero_len = PAGE_CACHE_SIZE -
+ (inode->i_size & (PAGE_CACHE_SIZE - 1));
+ if (zero_len > 0) {
+ ceph_zero_partial_page(inode, inode->i_size >> PAGE_CACHE_SHIFT,
inode->i_size & (PAGE_CACHE_SIZE -1), zero_len);
+ }
+ }
+}
+
+static inline __u32 ceph_calculate_shift(__s64 size)
+{
+ int shift;
+
+ if (size <= 0)
+ return -1;
+ if (size == 1)
+ return 0;
+ for (shift = 0; ;shift++) {
+ if (2 << shift == size)
+ break;
+ }
+ shift++;
+
+ return shift;
+}
+
+static int ceph_delete_object(struct inode *inode, u64 offset, u64 *length)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_osd_request *req;
+ int ret = 0;
+
+ req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+ ceph_vino(inode), offset, length, 1,
+ CEPH_OSD_OP_DELETE,
CEPH_OSD_FLAG_ONDISK,
+ NULL,
+ ci->i_truncate_seq,
ci->i_truncate_size,
+ false);
+ if (IS_ERR(req)) {
+ ret = PTR_ERR(req);
+ goto out;
+ }
+
+ ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+ if (!ret) {
+ ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+ }
+ ceph_osdc_put_request(req);
+
+ out:
+ return ret;
+}
+
+static int ceph_zero_partial_object(struct inode *inode, loff_t offset,
loff_t *length)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_osd_request *req;
+ int ret = 0;
+
+ if (length <= 0)
+ goto out;
+
+
+ req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+ ceph_vino(inode), offset, length, 1,
+ CEPH_OSD_OP_ZERO,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+ NULL,
+ ci->i_truncate_seq,
ci->i_truncate_size,
+ false);
+ if (IS_ERR(req)) {
+ ret = PTR_ERR(req);
+ goto out;
+ }
+
+ ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+ if (!ret) {
+ ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+ }
+ ceph_osdc_put_request(req);
+
+ out:
+ return ret;
+}
+
+static int ceph_zero_partial_object_set(struct inode *inode, loff_t
start, loff_t end)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ __s32 stripe_unit_size = ceph_file_layout_su(ci->i_layout);
+ __u32 stripe_unit_shift = ceph_calculate_shift(stripe_unit_size);
+ loff_t first_stripe_unit = ((start + stripe_unit_size -1 ) >>
stripe_unit_shift) << stripe_unit_shift;
+ loff_t last_stripe_unit = ((end + 1) >> stripe_unit_shift) <<
stripe_unit_shift;
+ u64 i;
+ loff_t length;
+ int ret = 0;
+
+ if (last_stripe_unit > first_stripe_unit) {
+ for (i = first_stripe_unit; i < last_stripe_unit; i +=
stripe_unit_size) {
+ length = (u64) stripe_unit_size;
+ ret = ceph_zero_partial_object(inode, i, &length);
+ if (ret)
+ goto out;
+ }
+ }
+ if (first_stripe_unit > last_stripe_unit) {
+ length = end - start + 1;
+ ret = ceph_zero_partial_object(inode, start, &length);
+ goto out;
+ }
+ length = first_stripe_unit - start;
+ if (length > 0) {
+ ret = ceph_zero_partial_object(inode, start, &length);
+ if (ret)
+ goto out;
+ }
+ length = end - last_stripe_unit + 1;
+ if (length > 0) {
+ ret = ceph_zero_partial_object(inode, last_stripe_unit, &length);
+ }
+
+ out:
+ return ret;
+}
+
+static int ceph_delete_and_zero_objects(struct file *file, loff_t
offset, loff_t length)
+{
+ struct ceph_file_info *fi = file->private_data;
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ __s32 stripe_unit_size = ceph_file_layout_su(ci->i_layout);
+ __s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
+ unsigned stripe_width = ceph_file_layout_stripe_width(&ci->i_layout);
+ __s32 object_size = ceph_file_layout_object_size(ci->i_layout);
+ __s32 object_set_size = object_size * stripe_count;
+ __u32 object_set_shift = ceph_calculate_shift(object_set_size);
+ __u32 stripe_unit_count_per_object = object_size / stripe_unit_size;
+ loff_t first_object_set = ((offset + object_set_size - 1) >>
object_set_shift) << object_set_shift;
+ loff_t last_object_set = ((offset + length) >> object_set_shift) <<
object_set_shift;
+ loff_t i, j;
+ int want, got = 0;
+ int dirty;
+ u64 len;
+ int ret = 0;
+
+ if (fi->fmode & CEPH_FILE_MODE_LAZY)
+ want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+ else
+ want = CEPH_CAP_FILE_BUFFER;
+
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, offset + length);
+ if (ret < 0)
+ return ret;
+ if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
+ ret = -EAGAIN;
+ goto out;
+ }
+
+ /* [offset, offset+length] does not across object set bundary.
+ * Yes, there are possibilities to delete some objects within
+ * a object set, however, we want to keep it simple, not to incur
+ * comprehensive calculation, so for a partial hole within a object
+ * set, we zero only
+ */
+ if (first_object_set > last_object_set) {
+ ret = ceph_zero_partial_object_set(inode, offset, offset + length - 1);
+ goto out;
+ }
+ /* [offset, offset+length] contains at least one complete object set */
+ if (last_object_set > first_object_set) {
+ len = (u64)stripe_unit_size;
+ /*
+ * For the very first object, zero it instead of deleting it,
+ * since there are attached metada on it
+ */
+ if (first_object_set == 0) {
+ for (i = 0; i < stripe_unit_count_per_object; i++) {
+ ret = ceph_zero_partial_object(inode, first_object_set +
i*stripe_width, &len);
+ if (ret)
+ goto out;
+ }
+ }
+ for (i = first_object_set; i < last_object_set; i += object_set_size) {
+ for (j = i; j < i + stripe_width; j += stripe_unit_size) {
+ /* skip the very first object */
+ if (j == 0)
+ continue;
+ ret = ceph_delete_object(inode, j, &len);
+ /* object already deleted */
+ if (ret == -ENOENT)
+ ret = 0;
+ if (ret)
+ goto out;
+ }
+ }
+ }
+
+ /* deal with the object set contains the start or the end of the hole */
+ if (first_object_set - offset > 0) {
+ ret = ceph_zero_partial_object_set(inode, offset, first_object_set - 1);
+ if (ret)
+ goto out;
+ }
+ if (offset + length - last_object_set > 0) {
+ ret = ceph_zero_partial_object_set(inode, last_object_set, offset +
length - 1);
+ }
+
+ out:
+ if (ret == 0) {
+ spin_lock(&ci->i_ceph_lock);
+ dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+ spin_unlock(&ci->i_ceph_lock);
+ if (dirty)
+ __mark_inode_dirty(inode, dirty);
+ }
+ ceph_put_cap_refs(ci, got);
+ return ret;
+}
+
+static int ceph_punch_hole(struct file *file, loff_t offset, loff_t length)
+{
+ struct inode *inode = file->f_dentry->d_inode;
+ int ret = 0;
+
+ if (!S_ISREG(inode->i_mode)) {
+ return -EOPNOTSUPP;
+ }
+ if (IS_SWAPFILE(inode)) {
+ return -ETXTBSY;
+ }
+ mutex_lock(&inode->i_mutex);
+
+ /* No need to punch hole beyond i_size */
+ if (offset >= inode->i_size)
+ goto out_unlock;
+
+ /*
+ * If the hole extends beyond i_size, set the hole
+ * to end after the page that contains i_size
+ */
+ if (offset + length > inode->i_size) {
+ length = inode->i_size +
+ PAGE_CACHE_SIZE - (inode->i_size & (PAGE_CACHE_SIZE - 1)) -
+ offset;
+ }
+
+ ceph_truncate_and_zero_page_cache(inode, offset, length);
+ ret = ceph_delete_and_zero_objects(file, offset, length);
+
+ out_unlock:
+ mutex_unlock(&inode->i_mutex);
+ return ret;
+}
+
+static long ceph_fallocate(struct file *file, int mode, loff_t offset,
loff_t length)
+{
+ /* FALLOC_FL_PUNCH_HOLE must be used with FALLOC_FL_KEEP_SIZE */
+ if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
+ return -EOPNOTSUPP;
+ if (mode & FALLOC_FL_PUNCH_HOLE)
+ return ceph_punch_hole(file, offset, length);
+ return -EOPNOTSUPP;
+}
+
const struct file_operations ceph_file_fops = {
.open = ceph_open,
.release = ceph_release,
@@ -898,5 +1210,6 @@ const struct file_operations ceph_file_fops = {
.splice_write = generic_file_splice_write,
.unlocked_ioctl = ceph_ioctl,
.compat_ioctl = ceph_ioctl,
+ .fallocate = ceph_fallocate,
};
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3a246a6..a6d9671 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -503,7 +503,8 @@ void osd_req_op_extent_init(struct ceph_osd_request
*osd_req,
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
size_t payload_len = 0;
- BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
+ BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
+ opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO);
op->extent.offset = offset;
op->extent.length = length;
@@ -631,6 +632,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request
*req,
break;
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
+ case CEPH_OSD_OP_DELETE:
+ case CEPH_OSD_OP_ZERO:
if (src->op == CEPH_OSD_OP_WRITE)
request_data_len = src->extent.length;
dst->extent.offset = cpu_to_le64(src->extent.offset);
@@ -715,7 +718,8 @@ struct ceph_osd_request
*ceph_osdc_new_request(struct ceph_osd_client *osdc,
u64 object_base;
int r;
- BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
+ BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
+ opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO);
req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
GFP_NOFS);
--
1.7.9.5
Hi Li,
There is a version of fsx.c floating around that tests hole punching...
have you tried running that on top of this patch? Ideally, we should
build a test (ceph.git/qa/workunits/rbd/hole_punch.sh or similar) that
tests the hole punch both with a default file layout and with a more
complicated striping pattern (e.g. object_size=1048576 stripe_unit=65536
stripe_count=7).
sage
On Thu, 20 Jun 2013, Li Wang wrote:
> This patch implements punch hole (fallocate) support for Ceph.
>
> Signed-off-by: Li Wang <[email protected]>
> Signed-off-by: Yunchuan Wen <[email protected]>
> ---
> fs/ceph/file.c | 313
> +++++++++++++++++++++++++++++++++++++++++++++++++
> net/ceph/osd_client.c | 8 +-
> 2 files changed, 319 insertions(+), 2 deletions(-)
>
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 656e169..578e5fd 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -8,6 +8,7 @@
> #include <linux/namei.h>
> #include <linux/writeback.h>
> #include <linux/aio.h>
> +#include <linux/falloc.h>
>
> #include "super.h"
> #include "mds_client.h"
> @@ -882,6 +883,317 @@ out:
> return offset;
> }
>
> +static inline void ceph_zero_partial_page(struct inode *inode, pgoff_t index,
> unsigned start, unsigned size)
> +{
> + struct page *page;
> +
> + page = find_lock_page(inode->i_mapping, index);
> + if (page) {
> + zero_user(page, start, size);
> + unlock_page(page);
> + page_cache_release(page);
> + }
> +}
> +
> +static void ceph_truncate_and_zero_page_cache(struct inode *inode, loff_t
> offset, loff_t length)
> +{
> + loff_t first_page;
> + loff_t last_page;
> + loff_t zero_len;
> +
> + first_page =((offset + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT) <<
> PAGE_CACHE_SHIFT;
> + last_page = ((offset + length) >> PAGE_CACHE_SHIFT) <<
> PAGE_CACHE_SHIFT;
> + if (last_page > first_page) {
> + truncate_pagecache_range(inode, first_page, last_page - 1);
> + }
> + if (first_page > last_page) {
> + ceph_zero_partial_page(inode, offset >> PAGE_CACHE_SHIFT,
> offset & (PAGE_CACHE_SIZE - 1), length);
> + return;
> + }
> + /*
> + * zero out the partial page that contains
> + * the start of the hole
> + */
> + zero_len = first_page - offset;
> + if (zero_len > 0) {
> + ceph_zero_partial_page(inode, offset >> PAGE_CACHE_SHIFT,
> offset & (PAGE_CACHE_SIZE -1), zero_len);
> + }
> + /*
> + * zero out the partial page that contains
> + * the end of the hole
> + */
> + zero_len = offset + length - last_page;
> + if (zero_len > 0) {
> + ceph_zero_partial_page(inode, (offset + length) >>
> PAGE_CACHE_SHIFT, 0, zero_len);
> + }
> + /*
> + * If i_size is contained in the last page, we need to
> + * zero the partial page after i_size
> + */
> + if (inode->i_size >> PAGE_CACHE_SHIFT == (offset + length) >>
> PAGE_CACHE_SHIFT && inode->i_size % PAGE_CACHE_SIZE != 0) {
> + zero_len = PAGE_CACHE_SIZE -
> + (inode->i_size & (PAGE_CACHE_SIZE - 1));
> + if (zero_len > 0) {
> + ceph_zero_partial_page(inode, inode->i_size >>
> PAGE_CACHE_SHIFT, inode->i_size & (PAGE_CACHE_SIZE -1), zero_len);
> + }
> + }
> +}
> +
> +static inline __u32 ceph_calculate_shift(__s64 size)
> +{
> + int shift;
> +
> + if (size <= 0)
> + return -1;
> + if (size == 1)
> + return 0;
> + for (shift = 0; ;shift++) {
> + if (2 << shift == size)
> + break;
> + }
> + shift++;
> +
> + return shift;
> +}
> +
> +static int ceph_delete_object(struct inode *inode, u64 offset, u64 *length)
> +{
> + struct ceph_inode_info *ci = ceph_inode(inode);
> + struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> + struct ceph_osd_request *req;
> + int ret = 0;
> +
> + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> + ceph_vino(inode), offset, length, 1,
> + CEPH_OSD_OP_DELETE, CEPH_OSD_FLAG_ONDISK,
> + NULL,
> + ci->i_truncate_seq, ci->i_truncate_size,
> + false);
> + if (IS_ERR(req)) {
> + ret = PTR_ERR(req);
> + goto out;
> + }
> +
> + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> + if (!ret) {
> + ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> + }
> + ceph_osdc_put_request(req);
> +
> + out:
> + return ret;
> +}
> +
> +static int ceph_zero_partial_object(struct inode *inode, loff_t offset,
> loff_t *length)
> +{
> + struct ceph_inode_info *ci = ceph_inode(inode);
> + struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> + struct ceph_osd_request *req;
> + int ret = 0;
> +
> + if (length <= 0)
> + goto out;
> +
> +
> + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> + ceph_vino(inode), offset, length, 1,
> + CEPH_OSD_OP_ZERO, CEPH_OSD_FLAG_WRITE |
> CEPH_OSD_FLAG_ONDISK,
> + NULL,
> + ci->i_truncate_seq, ci->i_truncate_size,
> + false);
> + if (IS_ERR(req)) {
> + ret = PTR_ERR(req);
> + goto out;
> + }
> +
> + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> + if (!ret) {
> + ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> + }
> + ceph_osdc_put_request(req);
> +
> + out:
> + return ret;
> +}
> +
> +static int ceph_zero_partial_object_set(struct inode *inode, loff_t start,
> loff_t end)
> +{
> + struct ceph_inode_info *ci = ceph_inode(inode);
> + __s32 stripe_unit_size = ceph_file_layout_su(ci->i_layout);
> + __u32 stripe_unit_shift = ceph_calculate_shift(stripe_unit_size);
> + loff_t first_stripe_unit = ((start + stripe_unit_size -1 ) >>
> stripe_unit_shift) << stripe_unit_shift;
> + loff_t last_stripe_unit = ((end + 1) >> stripe_unit_shift) <<
> stripe_unit_shift;
> + u64 i;
> + loff_t length;
> + int ret = 0;
> +
> + if (last_stripe_unit > first_stripe_unit) {
> + for (i = first_stripe_unit; i < last_stripe_unit; i +=
> stripe_unit_size) {
> + length = (u64) stripe_unit_size;
> + ret = ceph_zero_partial_object(inode, i, &length);
> + if (ret)
> + goto out;
> + }
> + }
> + if (first_stripe_unit > last_stripe_unit) {
> + length = end - start + 1;
> + ret = ceph_zero_partial_object(inode, start, &length);
> + goto out;
> + }
> + length = first_stripe_unit - start;
> + if (length > 0) {
> + ret = ceph_zero_partial_object(inode, start, &length);
> + if (ret)
> + goto out;
> + }
> + length = end - last_stripe_unit + 1;
> + if (length > 0) {
> + ret = ceph_zero_partial_object(inode, last_stripe_unit,
> &length);
> + }
> +
> + out:
> + return ret;
> +}
> +
> +static int ceph_delete_and_zero_objects(struct file *file, loff_t offset,
> loff_t length)
> +{
> + struct ceph_file_info *fi = file->private_data;
> + struct inode *inode = file->f_dentry->d_inode;
> + struct ceph_inode_info *ci = ceph_inode(inode);
> + __s32 stripe_unit_size = ceph_file_layout_su(ci->i_layout);
> + __s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
> + unsigned stripe_width = ceph_file_layout_stripe_width(&ci->i_layout);
> + __s32 object_size = ceph_file_layout_object_size(ci->i_layout);
> + __s32 object_set_size = object_size * stripe_count;
> + __u32 object_set_shift = ceph_calculate_shift(object_set_size);
> + __u32 stripe_unit_count_per_object = object_size / stripe_unit_size;
> + loff_t first_object_set = ((offset + object_set_size - 1) >>
> object_set_shift) << object_set_shift;
> + loff_t last_object_set = ((offset + length) >> object_set_shift) <<
> object_set_shift;
> + loff_t i, j;
> + int want, got = 0;
> + int dirty;
> + u64 len;
> + int ret = 0;
> +
> + if (fi->fmode & CEPH_FILE_MODE_LAZY)
> + want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
> + else
> + want = CEPH_CAP_FILE_BUFFER;
> +
> + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, offset +
> length);
> + if (ret < 0)
> + return ret;
> + if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
> + ret = -EAGAIN;
> + goto out;
> + }
> +
> + /* [offset, offset+length] does not across object set bundary.
> + * Yes, there are possibilities to delete some objects within
> + * a object set, however, we want to keep it simple, not to incur
> + * comprehensive calculation, so for a partial hole within a object
> + * set, we zero only
> + */
> + if (first_object_set > last_object_set) {
> + ret = ceph_zero_partial_object_set(inode, offset, offset +
> length - 1);
> + goto out;
> + }
> + /* [offset, offset+length] contains at least one complete object set
> */
> + if (last_object_set > first_object_set) {
> + len = (u64)stripe_unit_size;
> + /*
> + * For the very first object, zero it instead of deleting it,
> + * since there are attached metada on it
> + */
> + if (first_object_set == 0) {
> + for (i = 0; i < stripe_unit_count_per_object; i++) {
> + ret = ceph_zero_partial_object(inode,
> first_object_set + i*stripe_width, &len);
> + if (ret)
> + goto out;
> + }
> + }
> + for (i = first_object_set; i < last_object_set; i +=
> object_set_size) {
> + for (j = i; j < i + stripe_width; j +=
> stripe_unit_size) {
> + /* skip the very first object */
> + if (j == 0)
> + continue;
> + ret = ceph_delete_object(inode, j, &len);
> + /* object already deleted */
> + if (ret == -ENOENT)
> + ret = 0;
> + if (ret)
> + goto out;
> + }
> + }
> + }
> +
> + /* deal with the object set contains the start or the end of the hole
> */
> + if (first_object_set - offset > 0) {
> + ret = ceph_zero_partial_object_set(inode, offset,
> first_object_set - 1);
> + if (ret)
> + goto out;
> + }
> + if (offset + length - last_object_set > 0) {
> + ret = ceph_zero_partial_object_set(inode, last_object_set,
> offset + length - 1);
> + }
> +
> + out:
> + if (ret == 0) {
> + spin_lock(&ci->i_ceph_lock);
> + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
> + spin_unlock(&ci->i_ceph_lock);
> + if (dirty)
> + __mark_inode_dirty(inode, dirty);
> + }
> + ceph_put_cap_refs(ci, got);
> + return ret;
> +}
> +
> +static int ceph_punch_hole(struct file *file, loff_t offset, loff_t length)
> +{
> + struct inode *inode = file->f_dentry->d_inode;
> + int ret = 0;
> +
> + if (!S_ISREG(inode->i_mode)) {
> + return -EOPNOTSUPP;
> + }
> + if (IS_SWAPFILE(inode)) {
> + return -ETXTBSY;
> + }
> + mutex_lock(&inode->i_mutex);
> +
> + /* No need to punch hole beyond i_size */
> + if (offset >= inode->i_size)
> + goto out_unlock;
> +
> + /*
> + * If the hole extends beyond i_size, set the hole
> + * to end after the page that contains i_size
> + */
> + if (offset + length > inode->i_size) {
> + length = inode->i_size +
> + PAGE_CACHE_SIZE - (inode->i_size & (PAGE_CACHE_SIZE - 1)) -
> + offset;
> + }
> +
> + ceph_truncate_and_zero_page_cache(inode, offset, length);
> + ret = ceph_delete_and_zero_objects(file, offset, length);
> +
> + out_unlock:
> + mutex_unlock(&inode->i_mutex);
> + return ret;
> +}
> +
> +static long ceph_fallocate(struct file *file, int mode, loff_t offset, loff_t
> length)
> +{
> + /* FALLOC_FL_PUNCH_HOLE must be used with FALLOC_FL_KEEP_SIZE */
> + if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
> + return -EOPNOTSUPP;
> + if (mode & FALLOC_FL_PUNCH_HOLE)
> + return ceph_punch_hole(file, offset, length);
> + return -EOPNOTSUPP;
> +}
> +
> const struct file_operations ceph_file_fops = {
> .open = ceph_open,
> .release = ceph_release,
> @@ -898,5 +1210,6 @@ const struct file_operations ceph_file_fops = {
> .splice_write = generic_file_splice_write,
> .unlocked_ioctl = ceph_ioctl,
> .compat_ioctl = ceph_ioctl,
> + .fallocate = ceph_fallocate,
> };
>
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 3a246a6..a6d9671 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -503,7 +503,8 @@ void osd_req_op_extent_init(struct ceph_osd_request
> *osd_req,
> struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
> size_t payload_len = 0;
>
> - BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
> + BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
> + opcode != CEPH_OSD_OP_DELETE && opcode !=
> CEPH_OSD_OP_ZERO);
>
> op->extent.offset = offset;
> op->extent.length = length;
> @@ -631,6 +632,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
> break;
> case CEPH_OSD_OP_READ:
> case CEPH_OSD_OP_WRITE:
> + case CEPH_OSD_OP_DELETE:
> + case CEPH_OSD_OP_ZERO:
> if (src->op == CEPH_OSD_OP_WRITE)
> request_data_len = src->extent.length;
> dst->extent.offset = cpu_to_le64(src->extent.offset);
> @@ -715,7 +718,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct
> ceph_osd_client *osdc,
> u64 object_base;
> int r;
>
> - BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
> + BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
> + opcode != CEPH_OSD_OP_DELETE && opcode !=
> CEPH_OSD_OP_ZERO);
>
> req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
> GFP_NOFS);
> --
> 1.7.9.5
>
>
>
On Wed, Jun 19, 2013 at 09:31:21AM -0700, Sage Weil wrote:
> Hi Li,
>
> There is a version of fsx.c floating around that tests hole punching...
> have you tried running that on top of this patch? Ideally, we should
> build a test (ceph.git/qa/workunits/rbd/hole_punch.sh or similar) that
> tests the hole punch both with a default file layout and with a more
> complicated striping pattern (e.g. object_size=1048576 stripe_unit=65536
> stripe_count=7).
The version in xfstests has hole punch support, as does the version
of fsstress. There are also some corner case tests for punch
behaviour, so running the generic tests in xfstests
should shake out most bugs....
Cheers,
Dave.
--
Dave Chinner
[email protected]
On 06/19/2013 11:23:51 AM, Li Wang wrote:
> This patch implements punch hole (fallocate) support for Ceph.
>
> Signed-off-by: Li Wang <[email protected]>
> Signed-off-by: Yunchuan Wen <[email protected]>
> +static int ceph_delete_object(struct inode *inode, u64 offset, u64
> *length)
> +{
> + struct ceph_inode_info *ci = ceph_inode(inode);
> + struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> + struct ceph_osd_request *req;
Mixing tabs and spaces.
> +static int ceph_punch_hole(struct file *file, loff_t offset, loff_t
> length)
> +{
> + struct inode *inode = file->f_dentry->d_inode;
> + int ret = 0;
> +
> + if (!S_ISREG(inode->i_mode)) {
> + return -EOPNOTSUPP;
> + }
And again.
Rob-
This patch implements fallocate and punch hole support for Ceph kernel client.
Signed-off-by: Li Wang <[email protected]>
Signed-off-by: Yunchuan Wen <[email protected]>
---
Passed the fsx test from xfstests.
---
fs/ceph/file.c | 191 +++++++++++++++++++++++++++++++++++++++++++++++++
net/ceph/osd_client.c | 8 ++-
2 files changed, 197 insertions(+), 2 deletions(-)
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 656e169..6e56824 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -8,6 +8,7 @@
#include <linux/namei.h>
#include <linux/writeback.h>
#include <linux/aio.h>
+#include <linux/falloc.h>
#include "super.h"
#include "mds_client.h"
@@ -882,6 +883,195 @@ out:
return offset;
}
+static inline void ceph_zero_partial_page(
+ struct inode *inode, loff_t offset, unsigned size)
+{
+ struct page *page;
+ pgoff_t index = offset >> PAGE_CACHE_SHIFT;
+
+ page = find_lock_page(inode->i_mapping, index);
+ if (page) {
+ wait_on_page_writeback(page);
+ zero_user(page, offset & (PAGE_CACHE_SIZE - 1), size);
+ unlock_page(page);
+ page_cache_release(page);
+ }
+}
+
+static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset,
+ loff_t length)
+{
+ loff_t nearly = round_up(offset, PAGE_CACHE_SIZE);
+ if (offset < nearly) {
+ loff_t size = nearly - offset;
+ if (length < size)
+ size = length;
+ ceph_zero_partial_page(inode, offset, size);
+ offset += size;
+ length -= size;
+ }
+ if (length >= PAGE_CACHE_SIZE) {
+ loff_t size = round_down(length, PAGE_CACHE_SIZE);
+ truncate_pagecache_range(inode, offset, offset + size - 1);
+ offset += size;
+ length -= size;
+ }
+ if (length)
+ ceph_zero_partial_page(inode, offset, length);
+}
+
+static int ceph_zero_partial_object(struct inode *inode,
+ loff_t offset, loff_t *length)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_osd_request *req;
+ int ret = 0;
+ loff_t zero = 0;
+ int op = CEPH_OSD_OP_ZERO;
+
+ if (!length) {
+ op = CEPH_OSD_OP_DELETE;
+ length = &zero;
+ }
+
+ req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+ ceph_vino(inode),
+ offset, length,
+ 1, op,
+ CEPH_OSD_FLAG_WRITE |
+ CEPH_OSD_FLAG_ONDISK,
+ NULL, 0, 0, false);
+ if (IS_ERR(req)) {
+ ret = PTR_ERR(req);
+ goto out;
+ }
+
+ ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap,
+ &inode->i_mtime);
+
+ ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+ if (!ret)
+ ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+ ceph_osdc_put_request(req);
+
+out:
+ return ret;
+}
+
+static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length)
+{
+ int ret = 0;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ __s32 stripe_unit = ceph_file_layout_su(ci->i_layout);
+ __s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
+ __s32 object_size = ceph_file_layout_object_size(ci->i_layout);
+ loff_t object_set_size = (loff_t)object_size * stripe_count;
+
+ loff_t nearly = (offset + object_set_size - 1)
+ / object_set_size * object_set_size;
+ while (length && offset < nearly) {
+ loff_t size = length;
+ ret = ceph_zero_partial_object(inode, offset, &size);
+ if (ret < 0)
+ return ret;
+ offset += size;
+ length -= size;
+ }
+ while (length >= object_set_size) {
+ int i;
+ loff_t pos = offset;
+ for (i = 0; i < stripe_count; ++i) {
+ ret = ceph_zero_partial_object(inode, pos, NULL);
+ if (ret < 0)
+ return ret;
+ pos += stripe_unit;
+ }
+ offset += object_set_size;
+ length -= object_set_size;
+ }
+ while (length) {
+ loff_t size = length;
+ ret = ceph_zero_partial_object(inode, offset, &size);
+ if (ret < 0)
+ return ret;
+ offset += size;
+ length -= size;
+ }
+ return ret;
+}
+
+static long ceph_fallocate(struct file *file, int mode,
+ loff_t offset, loff_t length)
+{
+ struct ceph_file_info *fi = file->private_data;
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_osd_client *osdc =
+ &ceph_inode_to_client(inode)->client->osdc;
+ int want, got = 0;
+ int dirty;
+ int ret = 0;
+ loff_t endoff = 0;
+ loff_t size;
+
+ if (!S_ISREG(inode->i_mode))
+ return -EOPNOTSUPP;
+
+ if (IS_SWAPFILE(inode))
+ return -ETXTBSY;
+
+ mutex_lock(&inode->i_mutex);
+
+ if (ceph_snap(inode) != CEPH_NOSNAP) {
+ ret = -EROFS;
+ goto unlock;
+ }
+
+ if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) &&
+ !(mode & FALLOC_FL_PUNCH_HOLE)) {
+ ret = -ENOSPC;
+ goto unlock;
+ }
+
+ size = i_size_read(inode);
+ if (!(mode & FALLOC_FL_KEEP_SIZE))
+ endoff = offset + length;
+
+ if (fi->fmode & CEPH_FILE_MODE_LAZY)
+ want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+ else
+ want = CEPH_CAP_FILE_BUFFER;
+
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
+ if (ret < 0)
+ goto unlock;
+
+ if (mode & FALLOC_FL_PUNCH_HOLE) {
+ if (offset < size)
+ ceph_zero_pagecache_range(inode, offset, length);
+ ret = ceph_zero_objects(inode, offset, length);
+ } else if (endoff > size) {
+ truncate_pagecache_range(inode, size, -1);
+ if (ceph_inode_set_size(inode, endoff))
+ ceph_check_caps(ceph_inode(inode),
+ CHECK_CAPS_AUTHONLY, NULL);
+ }
+
+ if (!ret) {
+ spin_lock(&ci->i_ceph_lock);
+ dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+ spin_unlock(&ci->i_ceph_lock);
+ if (dirty)
+ __mark_inode_dirty(inode, dirty);
+ }
+
+ ceph_put_cap_refs(ci, got);
+unlock:
+ mutex_unlock(&inode->i_mutex);
+ return ret;
+}
+
const struct file_operations ceph_file_fops = {
.open = ceph_open,
.release = ceph_release,
@@ -898,5 +1088,6 @@ const struct file_operations ceph_file_fops = {
.splice_write = generic_file_splice_write,
.unlocked_ioctl = ceph_ioctl,
.compat_ioctl = ceph_ioctl,
+ .fallocate = ceph_fallocate,
};
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3a246a6..6cb076f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -503,7 +503,8 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
size_t payload_len = 0;
- BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
+ BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
+ opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO);
op->extent.offset = offset;
op->extent.length = length;
@@ -631,6 +632,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
break;
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
+ case CEPH_OSD_OP_ZERO:
+ case CEPH_OSD_OP_DELETE:
if (src->op == CEPH_OSD_OP_WRITE)
request_data_len = src->extent.length;
dst->extent.offset = cpu_to_le64(src->extent.offset);
@@ -715,7 +718,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
u64 object_base;
int r;
- BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
+ BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
+ opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO);
req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
GFP_NOFS);
--
1.7.9.5