2006-05-19 18:00:20

by Lever, Charles

[permalink] [raw]
Subject: [PATCH 0/6] Support scatter/gather I/O in NFS direct I/O path

These patches add support for asynchronous scatter/gather I/O to the NFS
direct I/O engine. They will apply cleanly to 2.6.17-rc4-mm1 with Badari's
API changes already applied.

Christoph, Badari, Trond, and Zach have already seen these at least once, but
they are in need of wider review and testing. I'd welcome their inclusion in
an mm kernel.

--
corporate: cel at netapp dot com
personal: chucklever at bigfoot dot com


2006-05-19 18:00:26

by Lever, Charles

[permalink] [raw]
Subject: [PATCH 1/6] nfs: "open code" the NFS direct write rescheduler

In order to support direct asynchronous vectored writes, the write
rescheduler in fs/nfs/direct.c must not depend on the I/O parameters in the
controlling nfs_direct_req structure. Refactor the write rescheduler to
use only the information in each nfs_write_data structure to redrive writes
if the server rebooted before we can commit them.

Signed-off-by: Chuck Lever <[email protected]>
---

fs/nfs/direct.c | 71 +++++++++++++++++++++++++++++++++++++++++++------------
1 files changed, 56 insertions(+), 15 deletions(-)

diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 0980e43..0fde4bf 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -94,8 +94,19 @@ struct nfs_direct_req {
struct nfs_writeverf verf; /* unstable write verifier */
};

-static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, int sync);
static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode);
+static const struct rpc_call_ops nfs_write_direct_ops;
+
+static inline int nfs_direct_dec_outstanding(struct nfs_direct_req *dreq)
+{
+ int result;
+
+ spin_lock(&dreq->lock);
+ result = --dreq->outstanding;
+ spin_unlock(&dreq->lock);
+
+ return (result == 0);
+}

/**
* nfs_direct_IO - NFS address space operation for direct I/O
@@ -428,14 +439,52 @@ static void nfs_direct_free_writedata(st
#if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4)
static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
{
+ struct inode *inode = dreq->inode;
struct list_head *pos;

- list_splice_init(&dreq->rewrite_list, &dreq->list);
- list_for_each(pos, &dreq->list)
- dreq->outstanding++;
+ /*
+ * Prevent I/O completion while we're still rescheduling
+ */
+ dreq->outstanding++;
+
dreq->count = 0;
+ list_for_each(pos, &dreq->rewrite_list) {
+ struct nfs_write_data *data =
+ list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages);
+
+ spin_lock(&dreq->lock);
+ dreq->outstanding++;
+ spin_unlock(&dreq->lock);
+
+ nfs_fattr_init(&data->fattr);
+ data->res.count = data->args.count;
+ memset(&data->verf, 0, sizeof(data->verf));
+
+ rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
+ &nfs_write_direct_ops, data);
+ NFS_PROTO(inode)->write_setup(data, FLUSH_STABLE);
+
+ data->task.tk_priority = RPC_PRIORITY_HIGH;
+ data->task.tk_cookie = (unsigned long) inode;

- nfs_direct_write_schedule(dreq, FLUSH_STABLE);
+ lock_kernel();
+ rpc_execute(&data->task);
+ unlock_kernel();
+
+ dfprintk(VFS, "NFS: %5u rescheduled direct write call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
+ data->task.tk_pid,
+ inode->i_sb->s_id,
+ (long long)NFS_FILEID(inode),
+ data->args.count,
+ (unsigned long long)data->args.offset);
+ }
+
+ /*
+ * If we raced with the rescheduled I/O and lost, then
+ * all the I/O is already complete.
+ */
+ if (nfs_direct_dec_outstanding(dreq))
+ nfs_direct_write_complete(dreq, inode);
}

static void nfs_direct_commit_result(struct rpc_task *task, void *calldata)
@@ -605,8 +654,6 @@ static void nfs_direct_write_result(stru
}
}
}
- /* In case we have to resend */
- data->args.stable = NFS_FILE_SYNC;

spin_unlock(&dreq->lock);
}
@@ -620,14 +667,8 @@ static void nfs_direct_write_release(voi
struct nfs_write_data *data = calldata;
struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;

- spin_lock(&dreq->lock);
- if (--dreq->outstanding) {
- spin_unlock(&dreq->lock);
- return;
- }
- spin_unlock(&dreq->lock);
-
- nfs_direct_write_complete(dreq, data->inode);
+ if (nfs_direct_dec_outstanding(dreq))
+ nfs_direct_write_complete(dreq, data->inode);
}

static const struct rpc_call_ops nfs_write_direct_ops = {

2006-05-19 18:00:55

by Lever, Charles

[permalink] [raw]
Subject: [PATCH 6/6] nfs: Support vector I/O throughout the NFS direct I/O path

Now that the preliminaries are complete, it is safe to loop over all the
segments in an iovec, dispatching all of the requests against a single
nfs_direct_req.

Test plan:
Specialized test applications using vectored synchronous and asynchronous
I/O.

Signed-off-by: Chuck Lever <[email protected]>
---

fs/nfs/direct.c | 78 +++++++++++++++++++++++++++++++++++--------------------
1 files changed, 50 insertions(+), 28 deletions(-)

diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 3d4ded0..e62b905 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -281,8 +281,6 @@ static int nfs_direct_read_schedule(stru
unsigned int pgbase;
unsigned int started = 0;

- dreq->outstanding++;
-
pgbase = user_addr & ~PAGE_MASK;
do {
struct nfs_read_data *data;
@@ -351,13 +349,30 @@ static int nfs_direct_read_schedule(stru
count -= bytes;
} while (count != 0);

+ return started;
+}
+
+static int nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, const struct iovec *iov, unsigned long nr_segs, loff_t pos)
+{
+ ssize_t started = 0;
+ unsigned long seg;
+
+ dreq->outstanding++;
+
+ for (seg = 0; seg < nr_segs; seg++) {
+ unsigned long user_addr = (unsigned long) iov[seg].iov_base;
+ size_t count = iov[seg].iov_len;
+ started += nfs_direct_read_schedule(dreq, user_addr, count, pos);
+ pos += count;
+ }
+
if (nfs_direct_dec_outstanding(dreq))
nfs_direct_complete(dreq);

return started;
}

-static ssize_t nfs_direct_read(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos)
+static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov, unsigned long nr_segs, loff_t pos)
{
ssize_t result = 0;
sigset_t oldset;
@@ -375,7 +390,7 @@ static ssize_t nfs_direct_read(struct ki
dreq->iocb = iocb;

rpc_clnt_sigmask(clnt, &oldset);
- if (nfs_direct_read_schedule(dreq, user_addr, count, pos))
+ if (nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos))
result = nfs_direct_wait(dreq);
rpc_clnt_sigunmask(clnt, &oldset);

@@ -606,8 +621,6 @@ static int nfs_direct_write_schedule(str
unsigned int pgbase;
unsigned int started = 0;

- dreq->outstanding++;
-
pgbase = user_addr & ~PAGE_MASK;
do {
struct nfs_write_data *data;
@@ -679,13 +692,31 @@ static int nfs_direct_write_schedule(str
count -= bytes;
} while (count != 0);

+ return started;
+}
+
+
+static int nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, const struct iovec *iov, unsigned long nr_segs, loff_t pos, int sync)
+{
+ ssize_t started = 0;
+ unsigned long seg;
+
+ dreq->outstanding++;
+
+ for (seg = 0; seg < nr_segs; seg++) {
+ unsigned long user_addr = (unsigned long) iov[seg].iov_base;
+ size_t count = iov[seg].iov_len;
+ started += nfs_direct_write_schedule(dreq, user_addr, count, pos, sync);
+ pos += count;
+ }
+
if (nfs_direct_dec_outstanding(dreq))
- nfs_direct_write_complete(dreq, inode);
+ nfs_direct_write_complete(dreq, dreq->inode);

return started;
}

-static ssize_t nfs_direct_write(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos)
+static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov, unsigned long nr_segs, loff_t pos, size_t count)
{
ssize_t result = 0;
sigset_t oldset;
@@ -711,7 +742,7 @@ static ssize_t nfs_direct_write(struct k
nfs_begin_data_update(inode);

rpc_clnt_sigmask(clnt, &oldset);
- if (nfs_direct_write_schedule(dreq, user_addr, count, pos, sync))
+ if (nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync))
result = nfs_direct_wait(dreq);
rpc_clnt_sigunmask(clnt, &oldset);

@@ -775,28 +806,22 @@ ssize_t nfs_file_direct_read(struct kioc
ssize_t retval;
struct file *file = iocb->ki_filp;
struct address_space *mapping = file->f_mapping;
- /* XXX: temporary */
- const char __user *buf = iov[0].iov_base;
- size_t count = iov[0].iov_len;

retval = check_access_ok(VERIFY_WRITE, iov, nr_segs);
if (retval <= 0)
goto out;
nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, retval);

- dprintk("nfs: direct read(%s/%s, %lu@%Ld)\n",
+ dprintk("nfs: direct read(%s/%s, %zd@%Ld)\n",
file->f_dentry->d_parent->d_name.name,
file->f_dentry->d_name.name,
- (unsigned long) count, (long long) pos);
-
- if (nr_segs != 1)
- return -EINVAL;
+ retval, (long long) pos);

retval = nfs_sync_mapping(mapping);
if (retval)
goto out;

- retval = nfs_direct_read(iocb, (unsigned long) buf, count, pos);
+ retval = nfs_direct_read(iocb, iov, nr_segs, pos);
if (retval > 0)
iocb->ki_pos = pos + retval;

@@ -832,35 +857,32 @@ out:
ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
unsigned long nr_segs, loff_t pos)
{
- ssize_t retval;
+ ssize_t retval, count;
struct file *file = iocb->ki_filp;
struct address_space *mapping = file->f_mapping;
- /* XXX: temporary */
- const char __user *buf = iov[0].iov_base;
- size_t count = iov[0].iov_len;

retval = check_access_ok(VERIFY_READ, iov, nr_segs);
if (retval <= 0)
goto out;
nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, retval);

- dfprintk(VFS, "nfs: direct write(%s/%s, %lu@%Ld)\n",
+ dfprintk(VFS, "nfs: direct write(%s/%s, %zd@%Ld)\n",
file->f_dentry->d_parent->d_name.name,
file->f_dentry->d_name.name,
- (unsigned long) count, (long long) pos);
-
- if (nr_segs != 1)
- return -EINVAL;
+ retval, (long long) pos);

+ count = retval;
retval = generic_write_checks(file, &pos, &count, 0);
if (retval)
goto out;
+ if (!count)
+ goto out; /* return 0 */

retval = nfs_sync_mapping(mapping);
if (retval)
goto out;

- retval = nfs_direct_write(iocb, (unsigned long) buf, count, pos);
+ retval = nfs_direct_write(iocb, iov, nr_segs, pos, count);

/*
* XXX: nfs_end_data_update() already ensures this file's

2006-05-19 18:00:42

by Lever, Charles

[permalink] [raw]
Subject: [PATCH 5/6] nfs: check all iov segments for correct memory access rights

Add Badari's function to check access for all the segments in a passed-in
iov. We can use the total byte count later.

Signed-off-by: Chuck Lever <[email protected]>
---

fs/nfs/direct.c | 65 +++++++++++++++++++++++++++++++++++--------------------
1 files changed, 41 insertions(+), 24 deletions(-)

diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 9cd6b96..3d4ded0 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -374,7 +374,6 @@ static ssize_t nfs_direct_read(struct ki
if (!is_sync_kiocb(iocb))
dreq->iocb = iocb;

- nfs_add_stats(inode, NFSIOS_DIRECTREADBYTES, count);
rpc_clnt_sigmask(clnt, &oldset);
if (nfs_direct_read_schedule(dreq, user_addr, count, pos))
result = nfs_direct_wait(dreq);
@@ -709,8 +708,6 @@ static ssize_t nfs_direct_write(struct k
if (!is_sync_kiocb(iocb))
dreq->iocb = iocb;

- nfs_add_stats(inode, NFSIOS_DIRECTWRITTENBYTES, count);
-
nfs_begin_data_update(inode);

rpc_clnt_sigmask(clnt, &oldset);
@@ -721,6 +718,36 @@ static ssize_t nfs_direct_write(struct k
return result;
}

+/*
+ * Check:
+ * 1. All bytes in the user buffers are properly accessible
+ * 2. The resulting number of bytes won't overflow ssize_t
+ */
+static ssize_t check_access_ok(int type, const struct iovec *iov, unsigned long nr_segs)
+{
+ ssize_t count = 0;
+ ssize_t retval = -EINVAL;
+ unsigned long seg;
+
+ for (seg = 0; seg < nr_segs; seg++) {
+ void __user *buf = iov[seg].iov_base;
+ ssize_t len = (ssize_t) iov[seg].iov_len;
+
+ if (len < 0) /* size_t not fitting an ssize_t .. */
+ goto out;
+ if (unlikely(!access_ok(type, buf, len))) {
+ retval = -EFAULT;
+ goto out;
+ }
+ count += len;
+ if (count < 0) /* math overflow on the ssize_t */
+ goto out;
+ }
+ retval = count;
+out:
+ return retval;
+}
+
/**
* nfs_file_direct_read - file direct read operation for NFS files
* @iocb: target I/O control block
@@ -745,13 +772,18 @@ static ssize_t nfs_direct_write(struct k
ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov,
unsigned long nr_segs, loff_t pos)
{
- ssize_t retval = -EINVAL;
+ ssize_t retval;
struct file *file = iocb->ki_filp;
struct address_space *mapping = file->f_mapping;
/* XXX: temporary */
const char __user *buf = iov[0].iov_base;
size_t count = iov[0].iov_len;

+ retval = check_access_ok(VERIFY_WRITE, iov, nr_segs);
+ if (retval <= 0)
+ goto out;
+ nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, retval);
+
dprintk("nfs: direct read(%s/%s, %lu@%Ld)\n",
file->f_dentry->d_parent->d_name.name,
file->f_dentry->d_name.name,
@@ -760,15 +792,6 @@ ssize_t nfs_file_direct_read(struct kioc
if (nr_segs != 1)
return -EINVAL;

- if (count < 0)
- goto out;
- retval = -EFAULT;
- if (!access_ok(VERIFY_WRITE, buf, count))
- goto out;
- retval = 0;
- if (!count)
- goto out;
-
retval = nfs_sync_mapping(mapping);
if (retval)
goto out;
@@ -816,6 +839,11 @@ ssize_t nfs_file_direct_write(struct kio
const char __user *buf = iov[0].iov_base;
size_t count = iov[0].iov_len;

+ retval = check_access_ok(VERIFY_READ, iov, nr_segs);
+ if (retval <= 0)
+ goto out;
+ nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, retval);
+
dfprintk(VFS, "nfs: direct write(%s/%s, %lu@%Ld)\n",
file->f_dentry->d_parent->d_name.name,
file->f_dentry->d_name.name,
@@ -828,17 +856,6 @@ ssize_t nfs_file_direct_write(struct kio
if (retval)
goto out;

- retval = -EINVAL;
- if ((ssize_t) count < 0)
- goto out;
- retval = 0;
- if (!count)
- goto out;
-
- retval = -EFAULT;
- if (!access_ok(VERIFY_READ, buf, count))
- goto out;
-
retval = nfs_sync_mapping(mapping);
if (retval)
goto out;

2006-05-19 18:00:57

by Lever, Charles

[permalink] [raw]
Subject: [PATCH 3/6] nfs: Eliminate nfs_get_user_pages()

Neil Brown observed that the kmalloc() in nfs_get_user_pages() is more
likely to fail if the I/O is large enough to require the allocation of more
than a single page to keep track of all the pinned pages in the user's
buffer.

Instead of tracking one large page array per dreq/iocb, track pages per
nfs_read/write_data, just like the cached I/O path does. An array for
pages is already allocated for us by nfs_readdata_alloc() (and the write
and commit equivalents).

This is also required for adding support for vectored I/O to the NFS direct
I/O path.

The original reason to pin the user buffer and allocate all the NFS data
structures before trying to schedule I/O was to ensure all needed resources
are allocated on the client before starting to send requests. This reduces
the chance that resource exhaustion on the client will cause a short read
or write.

On the other hand, for an application making very large application I/O
requests, this means that it will be nearly impossible for the application
to make forward progress on a resource-limited client.

Thus, moving the buffer pinning functionality into the I/O scheduling
loops should be good for scalability. The next patch will do the same for
NFS data structure allocation.

Signed-off-by: Chuck Lever <[email protected]>
---

fs/nfs/direct.c | 173 ++++++++++++++++++++++++-----------------------
include/linux/nfs_xdr.h | 2 +
2 files changed, 91 insertions(+), 84 deletions(-)

diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index aef0b98..ae451db 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -73,8 +73,6 @@ struct nfs_direct_req {
struct nfs_open_context *ctx; /* file open context info */
struct kiocb * iocb; /* controlling i/o request */
struct inode * inode; /* target file of i/o */
- struct page ** pages; /* pages in our buffer */
- unsigned int npages; /* count of pages */

/* completion state */
spinlock_t lock; /* protect completion state */
@@ -105,6 +103,16 @@ static inline int nfs_direct_dec_outstan
return (result == 0);
}

+static inline unsigned long nfs_count_pages(unsigned long user_addr, size_t size)
+{
+ unsigned long page_count;
+
+ page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
+ page_count -= user_addr >> PAGE_SHIFT;
+
+ return page_count;
+}
+
/**
* nfs_direct_IO - NFS address space operation for direct I/O
* @rw: direction (read or write)
@@ -127,50 +135,21 @@ ssize_t nfs_direct_IO(int rw, struct kio
return -EINVAL;
}

-static void nfs_free_user_pages(struct page **pages, int npages, int do_dirty)
+static void nfs_dirty_user_pages(struct page **pages, int npages)
{
int i;
for (i = 0; i < npages; i++) {
struct page *page = pages[i];
- if (do_dirty && !PageCompound(page))
+ if (!PageCompound(page))
set_page_dirty_lock(page);
- page_cache_release(page);
}
- kfree(pages);
}

-static inline int nfs_get_user_pages(int rw, unsigned long user_addr, size_t size, struct page ***pages)
+static void nfs_release_user_pages(struct page **pages, int npages)
{
- int result = -ENOMEM;
- unsigned long page_count;
- size_t array_size;
-
- page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
- page_count -= user_addr >> PAGE_SHIFT;
-
- array_size = (page_count * sizeof(struct page *));
- *pages = kmalloc(array_size, GFP_KERNEL);
- if (*pages) {
- down_read(&current->mm->mmap_sem);
- result = get_user_pages(current, current->mm, user_addr,
- page_count, (rw == READ), 0,
- *pages, NULL);
- up_read(&current->mm->mmap_sem);
- if (result != page_count) {
- /*
- * If we got fewer pages than expected from
- * get_user_pages(), the user buffer runs off the
- * end of a mapping; return EFAULT.
- */
- if (result >= 0) {
- nfs_free_user_pages(*pages, result, 0);
- result = -EFAULT;
- } else
- kfree(*pages);
- *pages = NULL;
- }
- }
- return result;
+ int i;
+ for (i = 0; i < npages; i++)
+ page_cache_release(pages[i]);
}

static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
@@ -229,18 +208,11 @@ out:
}

/*
- * We must hold a reference to all the pages in this direct read request
- * until the RPCs complete. This could be long *after* we are woken up in
- * nfs_direct_wait (for instance, if someone hits ^C on a slow server).
- *
- * In addition, synchronous I/O uses a stack-allocated iocb. Thus we
- * can't trust the iocb is still valid here if this is a synchronous
- * request. If the waiter is woken prematurely, the iocb is long gone.
+ * Synchronous I/O uses a stack-allocated iocb. Thus we can't trust
+ * the iocb is still valid here if this is a synchronous request.
*/
static void nfs_direct_complete(struct nfs_direct_req *dreq)
{
- nfs_free_user_pages(dreq->pages, dreq->npages, 1);
-
if (dreq->iocb) {
long res = (long) dreq->error;
if (!res)
@@ -295,6 +267,11 @@ static struct nfs_direct_req *nfs_direct
return dreq;
}

+/*
+ * We must hold a reference to all the pages in this direct read request
+ * until the RPCs complete. This could be long *after* we are woken up in
+ * nfs_direct_wait (for instance, if someone hits ^C on a slow server).
+ */
static void nfs_direct_read_result(struct rpc_task *task, void *calldata)
{
struct nfs_read_data *data = calldata;
@@ -303,6 +280,9 @@ static void nfs_direct_read_result(struc
if (nfs_readpage_result(task, data) != 0)
return;

+ nfs_dirty_user_pages(data->pagevec, data->npages);
+ nfs_release_user_pages(data->pagevec, data->npages);
+
spin_lock(&dreq->lock);

if (likely(task->tk_status >= 0))
@@ -316,6 +296,7 @@ static void nfs_direct_read_result(struc
}

spin_unlock(&dreq->lock);
+
nfs_direct_complete(dreq);
}

@@ -333,14 +314,13 @@ static void nfs_direct_read_schedule(str
struct nfs_open_context *ctx = dreq->ctx;
struct inode *inode = ctx->dentry->d_inode;
struct list_head *list = &dreq->list;
- struct page **pages = dreq->pages;
size_t rsize = NFS_SERVER(inode)->rsize;
- unsigned int curpage, pgbase;
+ unsigned int pgbase;
+ unsigned long result;
+ struct nfs_read_data *data;

- curpage = 0;
pgbase = user_addr & ~PAGE_MASK;
do {
- struct nfs_read_data *data;
size_t bytes;

bytes = rsize;
@@ -351,13 +331,21 @@ static void nfs_direct_read_schedule(str
data = list_entry(list->next, struct nfs_read_data, pages);
list_del_init(&data->pages);

+ data->npages = nfs_count_pages(user_addr, bytes);
+ down_read(&current->mm->mmap_sem);
+ result = get_user_pages(current, current->mm, user_addr,
+ data->npages, 1, 0, data->pagevec, NULL);
+ up_read(&current->mm->mmap_sem);
+ if (unlikely(result < data->npages))
+ goto out_err;
+
data->inode = inode;
data->cred = ctx->cred;
data->args.fh = NFS_FH(inode);
data->args.context = ctx;
data->args.offset = pos;
data->args.pgbase = pgbase;
- data->args.pages = &pages[curpage];
+ data->args.pages = data->pagevec;
data->args.count = bytes;
data->res.fattr = &data->fattr;
data->res.eof = 0;
@@ -380,17 +368,32 @@ static void nfs_direct_read_schedule(str
bytes,
(unsigned long long)data->args.offset);

+ user_addr += bytes;
pos += bytes;
pgbase += bytes;
- curpage += pgbase >> PAGE_SHIFT;
pgbase &= ~PAGE_MASK;

count -= bytes;
} while (count != 0);
BUG_ON(!list_empty(list));
+ return;
+
+ /* If get_user_pages fails, we stop sending reads. Read length
+ * accounting is handled by nfs_direct_read_result() . */
+out_err:
+ nfs_release_user_pages(data->pagevec, result);
+
+ list_add(&data->pages, list);
+ while (!list_empty(list)) {
+ data = list_entry(list->next, struct nfs_read_data, pages);
+ list_del(&data->pages);
+ nfs_readdata_free(data);
+ if (nfs_direct_dec_outstanding(dreq))
+ nfs_direct_complete(dreq);
+ }
}

-static ssize_t nfs_direct_read(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos, struct page **pages, unsigned int nr_pages)
+static ssize_t nfs_direct_read(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos)
{
ssize_t result;
sigset_t oldset;
@@ -402,8 +405,6 @@ static ssize_t nfs_direct_read(struct ki
if (!dreq)
return -ENOMEM;

- dreq->pages = pages;
- dreq->npages = nr_pages;
dreq->inode = inode;
dreq->ctx = get_nfs_open_context((struct nfs_open_context *)iocb->ki_filp->private_data);
if (!is_sync_kiocb(iocb))
@@ -424,6 +425,7 @@ static void nfs_direct_free_writedata(st
while (!list_empty(&dreq->list)) {
struct nfs_write_data *data = list_entry(dreq->list.next, struct nfs_write_data, pages);
list_del(&data->pages);
+ nfs_release_user_pages(data->pagevec, data->npages);
nfs_writedata_release(data);
}
}
@@ -677,14 +679,13 @@ static void nfs_direct_write_schedule(st
struct nfs_open_context *ctx = dreq->ctx;
struct inode *inode = ctx->dentry->d_inode;
struct list_head *list = &dreq->list;
- struct page **pages = dreq->pages;
size_t wsize = NFS_SERVER(inode)->wsize;
- unsigned int curpage, pgbase;
+ unsigned int pgbase;
+ unsigned long result;
+ struct nfs_write_data *data;

- curpage = 0;
pgbase = user_addr & ~PAGE_MASK;
do {
- struct nfs_write_data *data;
size_t bytes;

bytes = wsize;
@@ -693,6 +694,15 @@ static void nfs_direct_write_schedule(st

BUG_ON(list_empty(list));
data = list_entry(list->next, struct nfs_write_data, pages);
+
+ data->npages = nfs_count_pages(user_addr, bytes);
+ down_read(&current->mm->mmap_sem);
+ result = get_user_pages(current, current->mm, user_addr,
+ data->npages, 0, 0, data->pagevec, NULL);
+ up_read(&current->mm->mmap_sem);
+ if (unlikely(result < data->npages))
+ goto out_err;
+
list_move_tail(&data->pages, &dreq->rewrite_list);

data->inode = inode;
@@ -701,7 +711,7 @@ static void nfs_direct_write_schedule(st
data->args.context = ctx;
data->args.offset = pos;
data->args.pgbase = pgbase;
- data->args.pages = &pages[curpage];
+ data->args.pages = data->pagevec;
data->args.count = bytes;
data->res.fattr = &data->fattr;
data->res.count = bytes;
@@ -725,17 +735,32 @@ static void nfs_direct_write_schedule(st
bytes,
(unsigned long long)data->args.offset);

+ user_addr += bytes;
pos += bytes;
pgbase += bytes;
- curpage += pgbase >> PAGE_SHIFT;
pgbase &= ~PAGE_MASK;

count -= bytes;
} while (count != 0);
BUG_ON(!list_empty(list));
+ return;
+
+ /* If get_user_pages fails, we stop sending writes. Write length
+ * accounting is handled by nfs_direct_write_result() . */
+out_err:
+ nfs_release_user_pages(data->pagevec, result);
+
+ list_add(&data->pages, list);
+ while (!list_empty(list)) {
+ data = list_entry(list->next, struct nfs_write_data, pages);
+ list_del(&data->pages);
+ nfs_writedata_free(data);
+ if (nfs_direct_dec_outstanding(dreq))
+ nfs_direct_write_complete(dreq, inode);
+ }
}

-static ssize_t nfs_direct_write(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos, struct page **pages, int nr_pages)
+static ssize_t nfs_direct_write(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos)
{
ssize_t result;
sigset_t oldset;
@@ -751,8 +776,6 @@ static ssize_t nfs_direct_write(struct k
if (dreq->commit_data == NULL || count < wsize)
sync = FLUSH_STABLE;

- dreq->pages = pages;
- dreq->npages = nr_pages;
dreq->inode = inode;
dreq->ctx = get_nfs_open_context((struct nfs_open_context *)iocb->ki_filp->private_data);
if (!is_sync_kiocb(iocb))
@@ -795,8 +818,6 @@ ssize_t nfs_file_direct_read(struct kioc
unsigned long nr_segs, loff_t pos)
{
ssize_t retval = -EINVAL;
- int page_count;
- struct page **pages;
struct file *file = iocb->ki_filp;
struct address_space *mapping = file->f_mapping;
/* XXX: temporary */
@@ -824,14 +845,7 @@ ssize_t nfs_file_direct_read(struct kioc
if (retval)
goto out;

- retval = nfs_get_user_pages(READ, (unsigned long) buf,
- count, &pages);
- if (retval < 0)
- goto out;
- page_count = retval;
-
- retval = nfs_direct_read(iocb, (unsigned long) buf, count, pos,
- pages, page_count);
+ retval = nfs_direct_read(iocb, (unsigned long) buf, count, pos);
if (retval > 0)
iocb->ki_pos = pos + retval;

@@ -868,8 +882,6 @@ ssize_t nfs_file_direct_write(struct kio
unsigned long nr_segs, loff_t pos)
{
ssize_t retval;
- int page_count;
- struct page **pages;
struct file *file = iocb->ki_filp;
struct address_space *mapping = file->f_mapping;
/* XXX: temporary */
@@ -903,14 +915,7 @@ ssize_t nfs_file_direct_write(struct kio
if (retval)
goto out;

- retval = nfs_get_user_pages(WRITE, (unsigned long) buf,
- count, &pages);
- if (retval < 0)
- goto out;
- page_count = retval;
-
- retval = nfs_direct_write(iocb, (unsigned long) buf, count,
- pos, pages, page_count);
+ retval = nfs_direct_write(iocb, (unsigned long) buf, count, pos);

/*
* XXX: nfs_end_data_update() already ensures this file's
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index 7c7320f..102b5c8 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -729,6 +729,7 @@ struct nfs_read_data {
struct list_head pages; /* Coalesced read requests */
struct nfs_page *req; /* multi ops per nfs_page */
struct page **pagevec;
+ int npages; /* active pages in pagevec */
struct nfs_readargs args;
struct nfs_readres res;
#ifdef CONFIG_NFS_V4
@@ -747,6 +748,7 @@ struct nfs_write_data {
struct list_head pages; /* Coalesced requests we wish to flush */
struct nfs_page *req; /* multi ops per nfs_page */
struct page **pagevec;
+ int npages; /* active pages in pagevec */
struct nfs_writeargs args; /* argument struct */
struct nfs_writeres res; /* result struct */
#ifdef CONFIG_NFS_V4

2006-05-19 18:00:55

by Lever, Charles

[permalink] [raw]
Subject: [PATCH 4/6] nfs: alloc nfs_read/write_data as direct I/O is scheduled

Re-arrange the logic in the NFS direct I/O path so that nfs_read/write_data
structs are allocated just before they are scheduled, rather than
allocating them all at once before we start scheduling requests.

This will make it easier to support vectored asynchronous direct I/O.

Signed-off-by: Chuck Lever <[email protected]>
---

fs/nfs/direct.c | 230 +++++++++++++++++++------------------------------------
1 files changed, 79 insertions(+), 151 deletions(-)

diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index ae451db..9cd6b96 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -68,8 +68,6 @@ struct nfs_direct_req {
struct kref kref; /* release manager */

/* I/O parameters */
- struct list_head list, /* nfs_read/write_data structs */
- rewrite_list; /* saved nfs_write_data structs */
struct nfs_open_context *ctx; /* file open context info */
struct kiocb * iocb; /* controlling i/o request */
struct inode * inode; /* target file of i/o */
@@ -82,6 +80,7 @@ struct nfs_direct_req {
struct completion completion; /* wait for i/o completion */

/* commit state */
+ struct list_head rewrite_list; /* saved nfs_write_data structs */
struct nfs_write_data * commit_data; /* special write_data for commits */
int flags;
#define NFS_ODIRECT_DO_COMMIT (1) /* an unstable reply was received */
@@ -113,6 +112,11 @@ static inline unsigned long nfs_count_pa
return page_count;
}

+static inline unsigned int nfs_max_pages(unsigned int xsize)
+{
+ return (xsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
+}
+
/**
* nfs_direct_IO - NFS address space operation for direct I/O
* @rw: direction (read or write)
@@ -161,8 +165,8 @@ static inline struct nfs_direct_req *nfs
return NULL;

kref_init(&dreq->kref);
+ kref_get(&dreq->kref);
init_completion(&dreq->completion);
- INIT_LIST_HEAD(&dreq->list);
INIT_LIST_HEAD(&dreq->rewrite_list);
dreq->iocb = NULL;
dreq->ctx = NULL;
@@ -225,49 +229,6 @@ static void nfs_direct_complete(struct n
}

/*
- * Note we also set the number of requests we have in the dreq when we are
- * done. This prevents races with I/O completion so we will always wait
- * until all requests have been dispatched and completed.
- */
-static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, size_t rsize)
-{
- struct list_head *list;
- struct nfs_direct_req *dreq;
- unsigned int rpages = (rsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
-
- dreq = nfs_direct_req_alloc();
- if (!dreq)
- return NULL;
-
- list = &dreq->list;
- for(;;) {
- struct nfs_read_data *data = nfs_readdata_alloc(rpages);
-
- if (unlikely(!data)) {
- while (!list_empty(list)) {
- data = list_entry(list->next,
- struct nfs_read_data, pages);
- list_del(&data->pages);
- nfs_readdata_free(data);
- }
- kref_put(&dreq->kref, nfs_direct_req_release);
- return NULL;
- }
-
- INIT_LIST_HEAD(&data->pages);
- list_add(&data->pages, list);
-
- data->req = (struct nfs_page *) dreq;
- dreq->outstanding++;
- if (nbytes <= rsize)
- break;
- nbytes -= rsize;
- }
- kref_get(&dreq->kref);
- return dreq;
-}
-
-/*
* We must hold a reference to all the pages in this direct read request
* until the RPCs complete. This could be long *after* we are woken up in
* nfs_direct_wait (for instance, if someone hits ^C on a slow server).
@@ -306,39 +267,53 @@ static const struct rpc_call_ops nfs_rea
};

/*
- * For each nfs_read_data struct that was allocated on the list, dispatch
- * an NFS READ operation
+ * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
+ * operation. If nfs_readdata_alloc() or get_user_pages() fails,
+ * bail and stop sending more reads. Read length accounting is
+ * handled automatically by nfs_direct_read_result() .
*/
-static void nfs_direct_read_schedule(struct nfs_direct_req *dreq, unsigned long user_addr, size_t count, loff_t pos)
+static int nfs_direct_read_schedule(struct nfs_direct_req *dreq, unsigned long user_addr, size_t count, loff_t pos)
{
struct nfs_open_context *ctx = dreq->ctx;
struct inode *inode = ctx->dentry->d_inode;
- struct list_head *list = &dreq->list;
size_t rsize = NFS_SERVER(inode)->rsize;
+ unsigned int rpages = nfs_max_pages(rsize);
unsigned int pgbase;
- unsigned long result;
- struct nfs_read_data *data;
+ unsigned int started = 0;
+
+ dreq->outstanding++;

pgbase = user_addr & ~PAGE_MASK;
do {
+ struct nfs_read_data *data;
size_t bytes;
+ unsigned long result;
+
+ data = nfs_readdata_alloc(rpages);
+ if (unlikely(!data))
+ break;

bytes = rsize;
if (count < rsize)
bytes = count;

- BUG_ON(list_empty(list));
- data = list_entry(list->next, struct nfs_read_data, pages);
- list_del_init(&data->pages);
-
data->npages = nfs_count_pages(user_addr, bytes);
down_read(&current->mm->mmap_sem);
result = get_user_pages(current, current->mm, user_addr,
data->npages, 1, 0, data->pagevec, NULL);
up_read(&current->mm->mmap_sem);
- if (unlikely(result < data->npages))
- goto out_err;
+ if (unlikely(result < data->npages)) {
+ nfs_release_user_pages(data->pagevec, result);
+ nfs_readdata_release(data);
+ break;
+ }

+ spin_lock(&dreq->lock);
+ dreq->outstanding++;
+ spin_unlock(&dreq->lock);
+ started++; /* indicate there is something to wait for */
+
+ data->req = (struct nfs_page *) dreq;
data->inode = inode;
data->cred = ctx->cred;
data->args.fh = NFS_FH(inode);
@@ -375,33 +350,22 @@ static void nfs_direct_read_schedule(str

count -= bytes;
} while (count != 0);
- BUG_ON(!list_empty(list));
- return;

- /* If get_user_pages fails, we stop sending reads. Read length
- * accounting is handled by nfs_direct_read_result() . */
-out_err:
- nfs_release_user_pages(data->pagevec, result);
-
- list_add(&data->pages, list);
- while (!list_empty(list)) {
- data = list_entry(list->next, struct nfs_read_data, pages);
- list_del(&data->pages);
- nfs_readdata_free(data);
- if (nfs_direct_dec_outstanding(dreq))
- nfs_direct_complete(dreq);
- }
+ if (nfs_direct_dec_outstanding(dreq))
+ nfs_direct_complete(dreq);
+
+ return started;
}

static ssize_t nfs_direct_read(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos)
{
- ssize_t result;
+ ssize_t result = 0;
sigset_t oldset;
struct inode *inode = iocb->ki_filp->f_mapping->host;
struct rpc_clnt *clnt = NFS_CLIENT(inode);
struct nfs_direct_req *dreq;

- dreq = nfs_direct_read_alloc(count, NFS_SERVER(inode)->rsize);
+ dreq = nfs_direct_req_alloc();
if (!dreq)
return -ENOMEM;

@@ -412,8 +376,8 @@ static ssize_t nfs_direct_read(struct ki

nfs_add_stats(inode, NFSIOS_DIRECTREADBYTES, count);
rpc_clnt_sigmask(clnt, &oldset);
- nfs_direct_read_schedule(dreq, user_addr, count, pos);
- result = nfs_direct_wait(dreq);
+ if (nfs_direct_read_schedule(dreq, user_addr, count, pos))
+ result = nfs_direct_wait(dreq);
rpc_clnt_sigunmask(clnt, &oldset);

return result;
@@ -421,9 +385,8 @@ static ssize_t nfs_direct_read(struct ki

static void nfs_direct_free_writedata(struct nfs_direct_req *dreq)
{
- list_splice_init(&dreq->rewrite_list, &dreq->list);
- while (!list_empty(&dreq->list)) {
- struct nfs_write_data *data = list_entry(dreq->list.next, struct nfs_write_data, pages);
+ while (!list_empty(&dreq->rewrite_list)) {
+ struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages);
list_del(&data->pages);
nfs_release_user_pages(data->pagevec, data->npages);
nfs_writedata_release(data);
@@ -578,47 +541,6 @@ static void nfs_direct_write_complete(st
}
#endif

-static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize)
-{
- struct list_head *list;
- struct nfs_direct_req *dreq;
- unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
-
- dreq = nfs_direct_req_alloc();
- if (!dreq)
- return NULL;
-
- list = &dreq->list;
- for(;;) {
- struct nfs_write_data *data = nfs_writedata_alloc(wpages);
-
- if (unlikely(!data)) {
- while (!list_empty(list)) {
- data = list_entry(list->next,
- struct nfs_write_data, pages);
- list_del(&data->pages);
- nfs_writedata_free(data);
- }
- kref_put(&dreq->kref, nfs_direct_req_release);
- return NULL;
- }
-
- INIT_LIST_HEAD(&data->pages);
- list_add(&data->pages, list);
-
- data->req = (struct nfs_page *) dreq;
- dreq->outstanding++;
- if (nbytes <= wsize)
- break;
- nbytes -= wsize;
- }
-
- nfs_alloc_commit_data(dreq);
-
- kref_get(&dreq->kref);
- return dreq;
-}
-
static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
{
struct nfs_write_data *data = calldata;
@@ -671,40 +593,55 @@ static const struct rpc_call_ops nfs_wri
};

/*
- * For each nfs_write_data struct that was allocated on the list, dispatch
- * an NFS WRITE operation
+ * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
+ * operation. If nfs_writedata_alloc() or get_user_pages() fails,
+ * bail and stop sending more writes. Write length accounting is
+ * handled automatically by nfs_direct_write_result() .
*/
-static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, unsigned long user_addr, size_t count, loff_t pos, int sync)
+static int nfs_direct_write_schedule(struct nfs_direct_req *dreq, unsigned long user_addr, size_t count, loff_t pos, int sync)
{
struct nfs_open_context *ctx = dreq->ctx;
struct inode *inode = ctx->dentry->d_inode;
- struct list_head *list = &dreq->list;
size_t wsize = NFS_SERVER(inode)->wsize;
+ unsigned int wpages = nfs_max_pages(wsize);
unsigned int pgbase;
- unsigned long result;
- struct nfs_write_data *data;
+ unsigned int started = 0;
+
+ dreq->outstanding++;

pgbase = user_addr & ~PAGE_MASK;
do {
+ struct nfs_write_data *data;
size_t bytes;
+ unsigned long result;
+
+ data = nfs_writedata_alloc(wpages);
+ if (unlikely(!data))
+ break;

bytes = wsize;
if (count < wsize)
bytes = count;

- BUG_ON(list_empty(list));
- data = list_entry(list->next, struct nfs_write_data, pages);
-
data->npages = nfs_count_pages(user_addr, bytes);
down_read(&current->mm->mmap_sem);
result = get_user_pages(current, current->mm, user_addr,
data->npages, 0, 0, data->pagevec, NULL);
up_read(&current->mm->mmap_sem);
- if (unlikely(result < data->npages))
- goto out_err;
+ if (unlikely(result < data->npages)) {
+ nfs_release_user_pages(data->pagevec, result);
+ nfs_writedata_release(data);
+ break;
+ }
+
+ spin_lock(&dreq->lock);
+ dreq->outstanding++;
+ spin_unlock(&dreq->lock);
+ started++; /* indicate there is something to wait for */

list_move_tail(&data->pages, &dreq->rewrite_list);

+ data->req = (struct nfs_page *) dreq;
data->inode = inode;
data->cred = ctx->cred;
data->args.fh = NFS_FH(inode);
@@ -742,27 +679,16 @@ static void nfs_direct_write_schedule(st

count -= bytes;
} while (count != 0);
- BUG_ON(!list_empty(list));
- return;

- /* If get_user_pages fails, we stop sending writes. Write length
- * accounting is handled by nfs_direct_write_result() . */
-out_err:
- nfs_release_user_pages(data->pagevec, result);
-
- list_add(&data->pages, list);
- while (!list_empty(list)) {
- data = list_entry(list->next, struct nfs_write_data, pages);
- list_del(&data->pages);
- nfs_writedata_free(data);
- if (nfs_direct_dec_outstanding(dreq))
- nfs_direct_write_complete(dreq, inode);
- }
+ if (nfs_direct_dec_outstanding(dreq))
+ nfs_direct_write_complete(dreq, inode);
+
+ return started;
}

static ssize_t nfs_direct_write(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos)
{
- ssize_t result;
+ ssize_t result = 0;
sigset_t oldset;
struct inode *inode = iocb->ki_filp->f_mapping->host;
struct rpc_clnt *clnt = NFS_CLIENT(inode);
@@ -770,9 +696,11 @@ static ssize_t nfs_direct_write(struct k
size_t wsize = NFS_SERVER(inode)->wsize;
int sync = 0;

- dreq = nfs_direct_write_alloc(count, wsize);
+ dreq = nfs_direct_req_alloc();
if (!dreq)
return -ENOMEM;
+ nfs_alloc_commit_data(dreq);
+
if (dreq->commit_data == NULL || count < wsize)
sync = FLUSH_STABLE;

@@ -786,8 +714,8 @@ static ssize_t nfs_direct_write(struct k
nfs_begin_data_update(inode);

rpc_clnt_sigmask(clnt, &oldset);
- nfs_direct_write_schedule(dreq, user_addr, count, pos, sync);
- result = nfs_direct_wait(dreq);
+ if (nfs_direct_write_schedule(dreq, user_addr, count, pos, sync))
+ result = nfs_direct_wait(dreq);
rpc_clnt_sigunmask(clnt, &oldset);

return result;

2006-05-19 18:02:07

by Lever, Charles

[permalink] [raw]
Subject: [PATCH 2/6] nfs: remove user_addr and user_count from nfs_direct_req

These fields won't fit the iovec API, and are no longer used for anything
but passing a parameter to nfs_direct_read/write_schedule. Make the
parameters explicit, and remove the fields from nfs_direct_req.

Signed-off-by: Chuck Lever <[email protected]>
---

fs/nfs/direct.c | 29 ++++++++---------------------
1 files changed, 8 insertions(+), 21 deletions(-)

diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 0fde4bf..aef0b98 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -73,9 +73,6 @@ struct nfs_direct_req {
struct nfs_open_context *ctx; /* file open context info */
struct kiocb * iocb; /* controlling i/o request */
struct inode * inode; /* target file of i/o */
- unsigned long user_addr; /* location of user's buffer */
- size_t user_count; /* total bytes to move */
- loff_t pos; /* starting offset in file */
struct page ** pages; /* pages in our buffer */
unsigned int npages; /* count of pages */

@@ -331,19 +328,17 @@ static const struct rpc_call_ops nfs_rea
* For each nfs_read_data struct that was allocated on the list, dispatch
* an NFS READ operation
*/
-static void nfs_direct_read_schedule(struct nfs_direct_req *dreq)
+static void nfs_direct_read_schedule(struct nfs_direct_req *dreq, unsigned long user_addr, size_t count, loff_t pos)
{
struct nfs_open_context *ctx = dreq->ctx;
struct inode *inode = ctx->dentry->d_inode;
struct list_head *list = &dreq->list;
struct page **pages = dreq->pages;
- size_t count = dreq->user_count;
- loff_t pos = dreq->pos;
size_t rsize = NFS_SERVER(inode)->rsize;
unsigned int curpage, pgbase;

curpage = 0;
- pgbase = dreq->user_addr & ~PAGE_MASK;
+ pgbase = user_addr & ~PAGE_MASK;
do {
struct nfs_read_data *data;
size_t bytes;
@@ -407,9 +402,6 @@ static ssize_t nfs_direct_read(struct ki
if (!dreq)
return -ENOMEM;

- dreq->user_addr = user_addr;
- dreq->user_count = count;
- dreq->pos = pos;
dreq->pages = pages;
dreq->npages = nr_pages;
dreq->inode = inode;
@@ -419,7 +411,7 @@ static ssize_t nfs_direct_read(struct ki

nfs_add_stats(inode, NFSIOS_DIRECTREADBYTES, count);
rpc_clnt_sigmask(clnt, &oldset);
- nfs_direct_read_schedule(dreq);
+ nfs_direct_read_schedule(dreq, user_addr, count, pos);
result = nfs_direct_wait(dreq);
rpc_clnt_sigunmask(clnt, &oldset);

@@ -521,8 +513,8 @@ static void nfs_direct_commit_schedule(s
data->cred = dreq->ctx->cred;

data->args.fh = NFS_FH(data->inode);
- data->args.offset = dreq->pos;
- data->args.count = dreq->user_count;
+ data->args.offset = 0;
+ data->args.count = 0;
data->res.count = 0;
data->res.fattr = &data->fattr;
data->res.verf = &data->verf;
@@ -680,19 +672,17 @@ static const struct rpc_call_ops nfs_wri
* For each nfs_write_data struct that was allocated on the list, dispatch
* an NFS WRITE operation
*/
-static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, int sync)
+static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, unsigned long user_addr, size_t count, loff_t pos, int sync)
{
struct nfs_open_context *ctx = dreq->ctx;
struct inode *inode = ctx->dentry->d_inode;
struct list_head *list = &dreq->list;
struct page **pages = dreq->pages;
- size_t count = dreq->user_count;
- loff_t pos = dreq->pos;
size_t wsize = NFS_SERVER(inode)->wsize;
unsigned int curpage, pgbase;

curpage = 0;
- pgbase = dreq->user_addr & ~PAGE_MASK;
+ pgbase = user_addr & ~PAGE_MASK;
do {
struct nfs_write_data *data;
size_t bytes;
@@ -761,9 +751,6 @@ static ssize_t nfs_direct_write(struct k
if (dreq->commit_data == NULL || count < wsize)
sync = FLUSH_STABLE;

- dreq->user_addr = user_addr;
- dreq->user_count = count;
- dreq->pos = pos;
dreq->pages = pages;
dreq->npages = nr_pages;
dreq->inode = inode;
@@ -776,7 +763,7 @@ static ssize_t nfs_direct_write(struct k
nfs_begin_data_update(inode);

rpc_clnt_sigmask(clnt, &oldset);
- nfs_direct_write_schedule(dreq, sync);
+ nfs_direct_write_schedule(dreq, user_addr, count, pos, sync);
result = nfs_direct_wait(dreq);
rpc_clnt_sigunmask(clnt, &oldset);

2006-05-19 18:08:12

by Andrew Morton

[permalink] [raw]
Subject: Re: [PATCH 1/6] nfs: "open code" the NFS direct write rescheduler

Chuck Lever <[email protected]> wrote:
>
> + * Prevent I/O completion while we're still rescheduling
> + */
> + dreq->outstanding++;
> +

No locking.

> dreq->count = 0;
> + list_for_each(pos, &dreq->rewrite_list) {
> + struct nfs_write_data *data =
> + list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages);
> +
> + spin_lock(&dreq->lock);
> + dreq->outstanding++;
> + spin_unlock(&dreq->lock);

Locking.

Deliberate?

2006-05-19 18:15:05

by Andrew Morton

[permalink] [raw]
Subject: Re: [PATCH 3/6] nfs: Eliminate nfs_get_user_pages()

Chuck Lever <[email protected]> wrote:
>
> Neil Brown observed that the kmalloc() in nfs_get_user_pages() is more
> likely to fail if the I/O is large enough to require the allocation of more
> than a single page to keep track of all the pinned pages in the user's
> buffer.
>
> Instead of tracking one large page array per dreq/iocb, track pages per
> nfs_read/write_data, just like the cached I/O path does. An array for
> pages is already allocated for us by nfs_readdata_alloc() (and the write
> and commit equivalents).
>
> This is also required for adding support for vectored I/O to the NFS direct
> I/O path.
>
> The original reason to pin the user buffer and allocate all the NFS data
> structures before trying to schedule I/O was to ensure all needed resources
> are allocated on the client before starting to send requests. This reduces
> the chance that resource exhaustion on the client will cause a short read
> or write.
>
> On the other hand, for an application making very large application I/O
> requests, this means that it will be nearly impossible for the application
> to make forward progress on a resource-limited client.
>
> Thus, moving the buffer pinning functionality into the I/O scheduling
> loops should be good for scalability. The next patch will do the same for
> NFS data structure allocation.
>
> +static void nfs_release_user_pages(struct page **pages, int npages)
> {
> - int result = -ENOMEM;
> - unsigned long page_count;
> - size_t array_size;
> -
> - page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
> - page_count -= user_addr >> PAGE_SHIFT;
> -
> - array_size = (page_count * sizeof(struct page *));
> - *pages = kmalloc(array_size, GFP_KERNEL);
> - if (*pages) {
> - down_read(&current->mm->mmap_sem);
> - result = get_user_pages(current, current->mm, user_addr,
> - page_count, (rw == READ), 0,
> - *pages, NULL);
> - up_read(&current->mm->mmap_sem);
> - if (result != page_count) {
> - /*
> - * If we got fewer pages than expected from
> - * get_user_pages(), the user buffer runs off the
> - * end of a mapping; return EFAULT.
> - */
> - if (result >= 0) {
> - nfs_free_user_pages(*pages, result, 0);
> - result = -EFAULT;
> - } else
> - kfree(*pages);
> - *pages = NULL;
> - }
> - }
> - return result;
> + int i;
> + for (i = 0; i < npages; i++)
> + page_cache_release(pages[i]);
> }

If `npages' is negative, this does the right thing.

> + result = get_user_pages(current, current->mm, user_addr,
> + data->npages, 1, 0, data->pagevec, NULL);
> + up_read(&current->mm->mmap_sem);
> + if (unlikely(result < data->npages))
> + goto out_err;
> ...
> +out_err:
> + nfs_release_user_pages(data->pagevec, result);

And `npages' can indeed be negative.

So. No bug there, but the code is a little unobvious and fragile - if
someone were to alter a type then subtle bugs would happen.

Perhaps

if (result > 0)
nfs_release_user_pages(...);

would be cleaner. Or at least a loud comment in nfs_release_user_pages().

2006-05-19 18:19:47

by Andrew Morton

[permalink] [raw]
Subject: Re: [PATCH 5/6] nfs: check all iov segments for correct memory access rights

Chuck Lever <[email protected]> wrote:
>
> +/*
> + * Check:
> + * 1. All bytes in the user buffers are properly accessible
> + * 2. The resulting number of bytes won't overflow ssize_t
> + */

hm.

> +static ssize_t check_access_ok(int type, const struct iovec *iov, unsigned long nr_segs)
> +{
> + ssize_t count = 0;
> + ssize_t retval = -EINVAL;
> + unsigned long seg;
> +
> + for (seg = 0; seg < nr_segs; seg++) {
> + void __user *buf = iov[seg].iov_base;
> + ssize_t len = (ssize_t) iov[seg].iov_len;
> +
> + if (len < 0) /* size_t not fitting an ssize_t .. */
> + goto out;

do_readv_writev() already checked for negative iov_len, and that's the more
appropriate place to do it, rather than duplicating it in each filesystem
(or forgetting to!)

So is this check really needed?

> + if (unlikely(!access_ok(type, buf, len))) {
> + retval = -EFAULT;
> + goto out;
> + }

Now what's up here? Why does NFS, at this level, care about the page's
virtual address? get_user_pages() will handle that?

2006-05-19 18:24:08

by Badari Pulavarty

[permalink] [raw]
Subject: Re: [PATCH 5/6] nfs: check all iov segments for correct memory access rights

On Fri, 2006-05-19 at 14:00 -0400, Chuck Lever wrote:

>
> +/*
> + * Check:
> + * 1. All bytes in the user buffers are properly accessible
> + * 2. The resulting number of bytes won't overflow ssize_t
> + */
> +static ssize_t check_access_ok(int type, const struct iovec *iov, unsigned long nr_segs)
> +{
> + ssize_t count = 0;
> + ssize_t retval = -EINVAL;
> + unsigned long seg;
> +
> + for (seg = 0; seg < nr_segs; seg++) {
> + void __user *buf = iov[seg].iov_base;
> + ssize_t len = (ssize_t) iov[seg].iov_len;
> +
> + if (len < 0) /* size_t not fitting an ssize_t .. */
> + goto out;
> + if (unlikely(!access_ok(type, buf, len))) {
> + retval = -EFAULT;
> + goto out;
> + }
> + count += len;
> + if (count < 0) /* math overflow on the ssize_t */
> + goto out;
> + }
> + retval = count;
> +out:
> + return retval;
> +}
> +

May be move this to fs/read_write.c and export it - since we do the same
thing there also ?

Thanks,
Badari

2006-05-19 18:37:04

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH 1/6] nfs: "open code" the NFS direct write rescheduler

Andrew Morton wrote:
> Chuck Lever <[email protected]> wrote:
>> + * Prevent I/O completion while we're still rescheduling
>> + */
>> + dreq->outstanding++;
>> +
>
> No locking.
>
>> dreq->count = 0;
>> + list_for_each(pos, &dreq->rewrite_list) {
>> + struct nfs_write_data *data =
>> + list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages);
>> +
>> + spin_lock(&dreq->lock);
>> + dreq->outstanding++;
>> + spin_unlock(&dreq->lock);
>
> Locking.
>
> Deliberate?

Yes. At the top of the loop, there is no outstanding I/O, so no locking
is needed while updating "outstanding." Inside the loop, we've
dispatched some I/O against "dreq" so locking is needed to ensure
outstanding is updated properly.

--
corporate: cel at netapp dot com
personal: chucklever at bigfoot dot com

2006-05-19 18:43:25

by Andrew Morton

[permalink] [raw]
Subject: Re: [PATCH 1/6] nfs: "open code" the NFS direct write rescheduler

Chuck Lever <[email protected]> wrote:
>
> Andrew Morton wrote:
> > Chuck Lever <[email protected]> wrote:
> >> + * Prevent I/O completion while we're still rescheduling
> >> + */
> >> + dreq->outstanding++;
> >> +
> >
> > No locking.
> >
> >> dreq->count = 0;
> >> + list_for_each(pos, &dreq->rewrite_list) {
> >> + struct nfs_write_data *data =
> >> + list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages);
> >> +
> >> + spin_lock(&dreq->lock);
> >> + dreq->outstanding++;
> >> + spin_unlock(&dreq->lock);
> >
> > Locking.
> >
> > Deliberate?
>
> Yes. At the top of the loop, there is no outstanding I/O, so no locking
> is needed while updating "outstanding." Inside the loop, we've
> dispatched some I/O against "dreq" so locking is needed to ensure
> outstanding is updated properly.
>

OK. Well if I asked, then others will wonder about it. A comment would
cure that problem ;)

2006-05-19 18:46:33

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH 5/6] nfs: check all iov segments for correct memory access rights

Andrew Morton wrote:
> Chuck Lever <[email protected]> wrote:
>> +static ssize_t check_access_ok(int type, const struct iovec *iov, unsigned long nr_segs)
>> +{
>> + ssize_t count = 0;
>> + ssize_t retval = -EINVAL;
>> + unsigned long seg;
>> +
>> + for (seg = 0; seg < nr_segs; seg++) {
>> + void __user *buf = iov[seg].iov_base;
>> + ssize_t len = (ssize_t) iov[seg].iov_len;
>> +
>> + if (len < 0) /* size_t not fitting an ssize_t .. */
>> + goto out;
>
> do_readv_writev() already checked for negative iov_len, and that's the more
> appropriate place to do it, rather than duplicating it in each filesystem
> (or forgetting to!)
>
> So is this check really needed?

Ok, didn't see that function before. Badari, is this checking still needed?

>> + if (unlikely(!access_ok(type, buf, len))) {
>> + retval = -EFAULT;
>> + goto out;
>> + }
>
> Now what's up here? Why does NFS, at this level, care about the page's
> virtual address? get_user_pages() will handle that?

Interesting point. That may be a historical artifact that can be
removed. I'll take a look.

--
corporate: cel at netapp dot com
personal: chucklever at bigfoot dot com

2006-05-19 18:56:23

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH 1/6] nfs: "open code" the NFS direct write rescheduler

Andrew Morton wrote:
> Chuck Lever <[email protected]> wrote:
>> Andrew Morton wrote:
>>> Chuck Lever <[email protected]> wrote:
>>>> + * Prevent I/O completion while we're still rescheduling
>>>> + */
>>>> + dreq->outstanding++;
>>>> +
>>> No locking.
>>>
>>>> dreq->count = 0;
>>>> + list_for_each(pos, &dreq->rewrite_list) {
>>>> + struct nfs_write_data *data =
>>>> + list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages);
>>>> +
>>>> + spin_lock(&dreq->lock);
>>>> + dreq->outstanding++;
>>>> + spin_unlock(&dreq->lock);
>>> Locking.
>>>
>>> Deliberate?
>> Yes. At the top of the loop, there is no outstanding I/O, so no locking
>> is needed while updating "outstanding." Inside the loop, we've
>> dispatched some I/O against "dreq" so locking is needed to ensure
>> outstanding is updated properly.
>>
>
> OK. Well if I asked, then others will wonder about it. A comment would
> cure that problem ;)

Or, I could code defensively and just add locking there too, even though
it is not needed. This path is not a performance path, and things could
get changed at some point so my assumption that is no longer valid.

--
corporate: cel at netapp dot com
personal: chucklever at bigfoot dot com

2006-05-19 19:18:06

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH 3/6] nfs: Eliminate nfs_get_user_pages()

Andrew Morton wrote:
> Chuck Lever <[email protected]> wrote:
>> Neil Brown observed that the kmalloc() in nfs_get_user_pages() is more
>> likely to fail if the I/O is large enough to require the allocation of more
>> than a single page to keep track of all the pinned pages in the user's
>> buffer.
>>
>> Instead of tracking one large page array per dreq/iocb, track pages per
>> nfs_read/write_data, just like the cached I/O path does. An array for
>> pages is already allocated for us by nfs_readdata_alloc() (and the write
>> and commit equivalents).
>>
>> This is also required for adding support for vectored I/O to the NFS direct
>> I/O path.
>>
>> The original reason to pin the user buffer and allocate all the NFS data
>> structures before trying to schedule I/O was to ensure all needed resources
>> are allocated on the client before starting to send requests. This reduces
>> the chance that resource exhaustion on the client will cause a short read
>> or write.
>>
>> On the other hand, for an application making very large application I/O
>> requests, this means that it will be nearly impossible for the application
>> to make forward progress on a resource-limited client.
>>
>> Thus, moving the buffer pinning functionality into the I/O scheduling
>> loops should be good for scalability. The next patch will do the same for
>> NFS data structure allocation.
>>
>> +static void nfs_release_user_pages(struct page **pages, int npages)
>> {
>> - int result = -ENOMEM;
>> - unsigned long page_count;
>> - size_t array_size;
>> -
>> - page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
>> - page_count -= user_addr >> PAGE_SHIFT;
>> -
>> - array_size = (page_count * sizeof(struct page *));
>> - *pages = kmalloc(array_size, GFP_KERNEL);
>> - if (*pages) {
>> - down_read(&current->mm->mmap_sem);
>> - result = get_user_pages(current, current->mm, user_addr,
>> - page_count, (rw == READ), 0,
>> - *pages, NULL);
>> - up_read(&current->mm->mmap_sem);
>> - if (result != page_count) {
>> - /*
>> - * If we got fewer pages than expected from
>> - * get_user_pages(), the user buffer runs off the
>> - * end of a mapping; return EFAULT.
>> - */
>> - if (result >= 0) {
>> - nfs_free_user_pages(*pages, result, 0);
>> - result = -EFAULT;
>> - } else
>> - kfree(*pages);
>> - *pages = NULL;
>> - }
>> - }
>> - return result;
>> + int i;
>> + for (i = 0; i < npages; i++)
>> + page_cache_release(pages[i]);
>> }
>
> If `npages' is negative, this does the right thing.
>
>> + result = get_user_pages(current, current->mm, user_addr,
>> + data->npages, 1, 0, data->pagevec, NULL);
>> + up_read(&current->mm->mmap_sem);
>> + if (unlikely(result < data->npages))
>> + goto out_err;
>> ...
>> +out_err:
>> + nfs_release_user_pages(data->pagevec, result);
>
> And `npages' can indeed be negative.

I fixed this by making all of these an "unsigned long".
get_user_pages() returns an unsigned long result, so all these
comparisons should always work correctly.

nfs_count_pages() now also returns an unsigned long, but I don't see how
it is possible for it to compute a negative value.

> So. No bug there, but the code is a little unobvious and fragile - if
> someone were to alter a type then subtle bugs would happen.
>
> Perhaps
>
> if (result > 0)
> nfs_release_user_pages(...);
>
> would be cleaner. Or at least a loud comment in nfs_release_user_pages().

--
corporate: cel at netapp dot com
personal: chucklever at bigfoot dot com

2006-05-19 19:36:47

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH 5/6] nfs: check all iov segments for correct memory access rights

Andrew Morton wrote:
>> + if (unlikely(!access_ok(type, buf, len))) {
>> + retval = -EFAULT;
>> + goto out;
>> + }
>
> Now what's up here? Why does NFS, at this level, care about the page's
> virtual address? get_user_pages() will handle that?

I guess I'm not clear on what behavior is desired for scatter/gather if
one of the segments in an iov fails.

If one of the iov's will cause an EFAULT, how is that reported back to
the application, and what happens to the I/O being requested in the
other segments of the vector? When do we use an "all or nothing"
semantic, and when is it OK for some segments to fail?

--
corporate: cel at netapp dot com
personal: chucklever at bigfoot dot com

2006-05-19 20:04:30

by Andrew Morton

[permalink] [raw]
Subject: Re: [PATCH 5/6] nfs: check all iov segments for correct memory access rights

Chuck Lever <[email protected]> wrote:
>
> Andrew Morton wrote:
> >> + if (unlikely(!access_ok(type, buf, len))) {
> >> + retval = -EFAULT;
> >> + goto out;
> >> + }
> >
> > Now what's up here? Why does NFS, at this level, care about the page's
> > virtual address? get_user_pages() will handle that?
>
> I guess I'm not clear on what behavior is desired for scatter/gather if
> one of the segments in an iov fails.
>
> If one of the iov's will cause an EFAULT, how is that reported back to
> the application,

If nothing has yet been transferred to/from userspace, return -EFAULT.

If something has been transferred, return the number of bytes transferred.

> and what happens to the I/O being requested in the
> other segments of the vector?

The filesystem driver needs to handle it somehow.

> When do we use an "all or nothing"
> semantic, and when is it OK for some segments to fail?

Actually, fs/direct-io.c cheats and doesn't implement the correct
semantics. It returns either all-bytes-transferred or -EFOO. The way I
justify that is to point out that returning a partial transfer count
doesn't make a lot of sense when the I/Os could complete in any order -
yes, we know how much data got transferred, but we don't know whereabouts
in the user's memory that data ended up. So the user cannot trust _any_ of
it.

NFS direct-io can do the same.

But access_ok() isn't sufficient. All it tells you is that the virtual
address is a legal one for an application. But we could still get EFAULT
when trying to access it.

2006-05-22 11:51:44

by Andi Kleen

[permalink] [raw]
Subject: Re: [PATCH 5/6] nfs: check all iov segments for correct memory access rights

Chuck Lever <[email protected]> writes:

> Add Badari's function to check access for all the segments in a passed-in
> iov. We can use the total byte count later.

It seems bogus to me because there is no big reason the access_ok
can't be done later together with the real access. After all the
real access has to check anyways to handle unmapped pages.

To pass checking is more error prone than single pass.

-Andi