dio: clean up completion phase of direct_io_worker()
There have been a lot of bugs recently due to the way direct_io_worker() tries
to decide how to finish direct IO operations. In the worst examples it has
failed to call aio_complete() at all (hang) or called it too many times (oops).
This set of patches cleans up the completion phase with the goal of removing
the complexity that lead to these bugs. We end up with one path that
calculates the result of the operation after all off the bios have completed.
We decide when to generate a result of the operation using that path based on
the final release of a refcount on the dio structure.
I tried to progress towards the final state in steps that were relatively easy
to understand. Each step should compile but I only tested the final result of
having all the patches applied.
The patches result in a slight net decrease in code and binary size:
2.6.18-rc4-dio-cleanup/fs/direct-io.c | 8
2.6.18-rc5-dio-cleanup/fs/direct-io.c | 94 +++++------
2.6.18-rc5-dio-cleanup/mm/filemap.c | 4
fs/direct-io.c | 273 ++++++++++++++--------------------
4 files changed, 159 insertions(+), 220 deletions(-)
text data bss dec hex filename
2592385 450996 210296 3253677 31a5ad vmlinux.before
2592113 450980 210296 3253389 31a48d vmlinux.after
The patches pass light testing with aio-stress, the direct IO tests I could
manage to get running in LTP, and some home-brew functional tests. It's still
making its way through stress testing. It should not be merged until we hear
from that more rigorous testing, I don't think.
I hoped to get some feedback (and maybe volunteers for testing!) by sending the
patches out before waiting for the stress tests.
- z
dio: centralize completion in dio_complete()
The mechanics which decide the result of a direct IO operation were duplicated
in the sync and async paths.
The async path didn't check page_errors which can manifest as silently
returning success when the final pointer in an operation faults and its
matching file region is filled with zeros.
The sync path and async path differed in whether they passed errors to the
caller's dio->end_io operation. The async path was passing errors to it which
trips an assertion in XFS, though it is apparently harmless.
This centralizes the completion phase of dio ops in one place. AIO will now
return EFAULT consistently and all paths fall back to the previously sync
behaviour of passing the number of bytes 'transferred' to the dio->end_io
callback, regardless of errors.
dio_await_completion() doesn't have to propogate EIO from non-uptodate
bios now that it's being propogated through dio_complete() via dio->io_error.
This lets it return void which simplifies its sole caller.
Signed-off-by: Zach Brown <[email protected]>
---
fs/direct-io.c | 94 +++++++++++++++++++++--------------------------
1 file changed, 42 insertions(+), 52 deletions(-)
Index: 2.6.18-rc6-dio-cleanup/fs/direct-io.c
===================================================================
--- 2.6.18-rc6-dio-cleanup.orig/fs/direct-io.c
+++ 2.6.18-rc6-dio-cleanup/fs/direct-io.c
@@ -209,19 +209,46 @@ static struct page *dio_get_page(struct
return dio->pages[dio->head++];
}
-/*
- * Called when all DIO BIO I/O has been completed - let the filesystem
- * know, if it registered an interest earlier via get_block. Pass the
- * private field of the map buffer_head so that filesystems can use it
- * to hold additional state between get_block calls and dio_complete.
+/**
+ * dio_complete() - called when all DIO BIO I/O has been completed
+ * @offset: the byte offset in the file of the completed operation
+ *
+ * This releases locks as dictated by the locking type, lets interested parties
+ * know that a DIO operation has completed, and calculates the resulting return
+ * code for the operation.
+ *
+ * It lets the filesystem know if it registered an interest earlier via
+ * get_block. Pass the private field of the map buffer_head so that
+ * filesystems can use it to hold additional state between get_block calls and
+ * dio_complete.
*/
-static void dio_complete(struct dio *dio, loff_t offset, ssize_t bytes)
+static int dio_complete(struct dio *dio, loff_t offset, int ret)
{
+ ssize_t transferred = 0;
+
+ if (dio->result) {
+ transferred = dio->result;
+
+ /* Check for short read case */
+ if ((dio->rw == READ) && ((offset + transferred) > dio->i_size))
+ transferred = dio->i_size - offset;
+ }
+
if (dio->end_io && dio->result)
- dio->end_io(dio->iocb, offset, bytes, dio->map_bh.b_private);
+ dio->end_io(dio->iocb, offset, transferred,
+ dio->map_bh.b_private);
if (dio->lock_type == DIO_LOCKING)
/* lockdep: non-owner release */
up_read_non_owner(&dio->inode->i_alloc_sem);
+
+ if (ret == 0)
+ ret = dio->page_errors;
+ if (ret == 0)
+ ret = dio->io_error;
+ if (ret == 0)
+ ret = transferred;
+
+ return ret;
}
/*
@@ -235,8 +262,7 @@ static void finished_one_bio(struct dio
spin_lock_irqsave(&dio->bio_lock, flags);
if (dio->bio_count == 1) {
if (dio->is_async) {
- ssize_t transferred;
- loff_t offset;
+ int ret;
/*
* Last reference to the dio is going away.
@@ -244,24 +270,12 @@ static void finished_one_bio(struct dio
*/
spin_unlock_irqrestore(&dio->bio_lock, flags);
- /* Check for short read case */
- transferred = dio->result;
- offset = dio->iocb->ki_pos;
-
- if ((dio->rw == READ) &&
- ((offset + transferred) > dio->i_size))
- transferred = dio->i_size - offset;
-
- /* check for error in completion path */
- if (dio->io_error)
- transferred = dio->io_error;
-
- dio_complete(dio, offset, transferred);
+ ret = dio_complete(dio, dio->iocb->ki_pos, 0);
/* Complete AIO later if falling back to buffered i/o */
if (dio->result == dio->size ||
((dio->rw == READ) && dio->result)) {
- aio_complete(dio->iocb, transferred, 0);
+ aio_complete(dio->iocb, ret, 0);
kfree(dio);
return;
} else {
@@ -433,10 +447,8 @@ static int dio_bio_complete(struct dio *
/*
* Wait on and process all in-flight BIOs.
*/
-static int dio_await_completion(struct dio *dio)
+static void dio_await_completion(struct dio *dio)
{
- int ret = 0;
-
if (dio->bio)
dio_bio_submit(dio);
@@ -447,13 +459,9 @@ static int dio_await_completion(struct d
*/
while (dio->bio_count) {
struct bio *bio = dio_await_one(dio);
- int ret2;
-
- ret2 = dio_bio_complete(dio, bio);
- if (ret == 0)
- ret = ret2;
+ /* io errors are propogated through dio->io_error */
+ dio_bio_complete(dio, bio);
}
- return ret;
}
/*
@@ -1119,28 +1127,10 @@ direct_io_worker(int rw, struct kiocb *i
kfree(dio);
}
} else {
- ssize_t transferred = 0;
-
finished_one_bio(dio);
- ret2 = dio_await_completion(dio);
- if (ret == 0)
- ret = ret2;
- if (ret == 0)
- ret = dio->page_errors;
- if (dio->result) {
- loff_t i_size = i_size_read(inode);
+ dio_await_completion(dio);
- transferred = dio->result;
- /*
- * Adjust the return value if the read crossed a
- * non-block-aligned EOF.
- */
- if (rw == READ && (offset + transferred > i_size))
- transferred = i_size - offset;
- }
- dio_complete(dio, offset, transferred);
- if (ret == 0)
- ret = transferred;
+ ret = dio_complete(dio, offset, ret);
/* We could have also come here on an AIO file extend */
if (!is_sync_kiocb(iocb) && (rw & WRITE) &&
dio: only call aio_complete() after returning -EIOCBQUEUED
The only time it is safe to call aio_complete() is when the ->ki_retry function
returns -EIOCBQUEUED to the AIO core. direct_io_worker() has historically done
this by relying on its caller to translate positive return codes into
-EIOCBQUEUED for the aio case. It did this by trying to keep conditionals in
sync. direct_io_worker() knew when finished_one_bio() was going to call
aio_complete(). It would reverse the test and wait and free the dio in the
cases it thought that finished_one_bio() wasn't going to.
Not surprisingly, it ended up getting it wrong. 'ret' could be a negative
errno from the submission path but it failed to communicate this to
finished_one_bio(). direct_io_worker() would return < 0, it's callers wouldn't
raise -EIOCBQUEUED, and aio_complete() would be called. In the future
finished_one_bio()'s tests wouldn't reflect this and aio_complete() would be
called for a second time which can manifest as an oops.
The previous cleanups have whittled the sync and async completion paths down to
the point where we can collapse them and clearly reassert the invariant that we
must only call aio_complete() after returning -EIOCBQUEUED. direct_io_worker()
will only return -EIOCBQUEUED when it is not the last to drop the dio refcount
and the aio bio completion path will only call aio_complete() when it is the
last to drop the dio refcount. direct_io_worker() can ensure that it is the
last to drop the reference count by waiting for bios to drain. It does this
for sync ops, of course, and for partial dio writes that must fall back to
buffered and for aio ops that saw errors during submission.
This means that operations that end up waiting, even if they were issued as aio
ops, will not call aio_complete() from dio. Instead we return the return code
of the operation and let the aio core call aio_complete(). This is purposely
done to fix a bug where AIO DIO file extensions would call aio_complete()
before their callers have a chance to update i_size.
Now that direct_io_worker() is explicitly returning -EIOCBQUEUED its callers no
longer have to translate for it.
Signed-off-by: Zach Brown <[email protected]>
---
fs/direct-io.c | 92 ++++++++++++++++++-----------------------------
mm/filemap.c | 4 --
2 files changed, 36 insertions(+), 60 deletions(-)
Index: 2.6.18-rc6-dio-cleanup/fs/direct-io.c
===================================================================
--- 2.6.18-rc6-dio-cleanup.orig/fs/direct-io.c
+++ 2.6.18-rc6-dio-cleanup/fs/direct-io.c
@@ -225,6 +225,15 @@ static int dio_complete(struct dio *dio,
{
ssize_t transferred = 0;
+ /*
+ * AIO submission can race with bio completion to get here while
+ * expecting to have the last io completed by bio completion.
+ * In that case -EIOCBQUEUED is in fact not an error we want
+ * to preserve through this call.
+ */
+ if (ret == -EIOCBQUEUED)
+ ret = 0;
+
if (dio->result) {
transferred = dio->result;
@@ -250,24 +259,6 @@ static int dio_complete(struct dio *dio,
return ret;
}
-/*
- * Called when a BIO has been processed. If the count goes to zero then IO is
- * complete and we can signal this to the AIO layer.
- */
-static void dio_complete_aio(struct dio *dio)
-{
- int ret;
-
- ret = dio_complete(dio, dio->iocb->ki_pos, 0);
-
- /* Complete AIO later if falling back to buffered i/o */
- if (dio->result == dio->size ||
- ((dio->rw == READ) && dio->result)) {
- aio_complete(dio->iocb, ret, 0);
- kfree(dio);
- }
-}
-
static int dio_bio_complete(struct dio *dio, struct bio *bio);
/*
* Asynchronous IO callback.
@@ -289,8 +280,11 @@ static int dio_bio_end_aio(struct bio *b
if (remaining == 1 && waiter_holds_ref)
wake_up_process(dio->waiter);
- if (remaining == 0)
- dio_complete_aio(dio);
+ if (remaining == 0) {
+ int ret = dio_complete(dio, dio->iocb->ki_pos, 0);
+ aio_complete(dio->iocb, ret, 0);
+ kfree(dio);
+ }
return 0;
}
@@ -1074,47 +1068,33 @@ direct_io_worker(int rw, struct kiocb *i
mutex_unlock(&dio->inode->i_mutex);
/*
- * OK, all BIOs are submitted, so we can decrement bio_count to truly
- * reflect the number of to-be-processed BIOs.
- */
- if (dio->is_async) {
- int should_wait = 0;
-
- if (dio->result < dio->size && (rw & WRITE)) {
- dio->waiter = current;
- should_wait = 1;
- }
- if (ret == 0)
- ret = dio->result;
-
- if (should_wait)
- dio_await_completion(dio);
-
- /* this can free the dio */
- if (atomic_dec_and_test(&dio->refcount))
- dio_complete_aio(dio);
+ * The only time we want to leave bios in flight is when a successful
+ * partial aio read or full aio write have been setup. In that case
+ * bio completion will call aio_complete. The only time it's safe to
+ * call aio_complete is when we return -EIOCBQUEUED, so we key on that.
+ * This had *better* be the only place that raises -EIOCBQUEUED.
+ */
+ BUG_ON(ret == -EIOCBQUEUED);
+ if (dio->is_async && ret == 0 && dio->result &&
+ ((rw & READ) || (dio->result == dio->size)))
+ ret = -EIOCBQUEUED;
- if (should_wait)
- kfree(dio);
- } else {
+ if (ret != -EIOCBQUEUED)
dio_await_completion(dio);
+ /*
+ * Sync will always be dropping the final ref and completing the
+ * operation. AIO can if it was a broken operation described above
+ * or in fact if all the bios race to complete before we get here.
+ * In that case dio_complete() translates the EIOCBQUEUED into
+ * the proper return code that the caller will hand to aio_complete().
+ */
+ if (atomic_dec_and_test(&dio->refcount)) {
ret = dio_complete(dio, offset, ret);
+ kfree(dio);
+ } else
+ BUG_ON(ret != -EIOCBQUEUED);
- /* We could have also come here on an AIO file extend */
- if (!is_sync_kiocb(iocb) && (rw & WRITE) &&
- ret >= 0 && dio->result == dio->size)
- /*
- * For AIO writes where we have completed the
- * i/o, we have to mark the the aio complete.
- */
- aio_complete(iocb, ret, 0);
-
- if (atomic_dec_and_test(&dio->refcount))
- kfree(dio);
- else
- BUG();
- }
return ret;
}
Index: 2.6.18-rc6-dio-cleanup/mm/filemap.c
===================================================================
--- 2.6.18-rc6-dio-cleanup.orig/mm/filemap.c
+++ 2.6.18-rc6-dio-cleanup/mm/filemap.c
@@ -1175,8 +1175,6 @@ __generic_file_aio_read(struct kiocb *io
if (pos < size) {
retval = generic_file_direct_IO(READ, iocb,
iov, pos, nr_segs);
- if (retval > 0 && !is_sync_kiocb(iocb))
- retval = -EIOCBQUEUED;
if (retval > 0)
*ppos = pos + retval;
}
@@ -2053,8 +2051,6 @@ generic_file_direct_write(struct kiocb *
if (err < 0)
written = err;
}
- if (written == count && !is_sync_kiocb(iocb))
- written = -EIOCBQUEUED;
return written;
}
EXPORT_SYMBOL(generic_file_direct_write);
dio: remove duplicate bio wait code
Now that we have a single refcount and waiting path we can reuse it in the
async 'should_wait' path. It continues to rely on the fragile link between the
conditional in dio_complete_aio() which decides to complete the AIO and the
conditional in direct_io_worker() which decides to wait and free.
By waiting before dropping the reference we stop dio_bio_end_aio() from calling
dio_complete_aio() which used to wake up the waiter after seeing the reference
count drop to 0. We hoist this wake up into dio_bio_end_aio() which now
notices when it's left a single remaining reference that is held by the waiter.
Signed-off-by: Zach Brown <[email protected]>
---
fs/direct-io.c | 41 ++++++++++++-----------------------------
1 file changed, 12 insertions(+), 29 deletions(-)
Index: 2.6.18-rc6-dio-cleanup/fs/direct-io.c
===================================================================
--- 2.6.18-rc6-dio-cleanup.orig/fs/direct-io.c
+++ 2.6.18-rc6-dio-cleanup/fs/direct-io.c
@@ -256,7 +256,6 @@ static int dio_complete(struct dio *dio,
*/
static void dio_complete_aio(struct dio *dio)
{
- unsigned long flags;
int ret;
ret = dio_complete(dio, dio->iocb->ki_pos, 0);
@@ -266,14 +265,6 @@ static void dio_complete_aio(struct dio
((dio->rw == READ) && dio->result)) {
aio_complete(dio->iocb, ret, 0);
kfree(dio);
- } else {
- /*
- * Falling back to buffered
- */
- spin_lock_irqsave(&dio->bio_lock, flags);
- if (dio->waiter)
- wake_up_process(dio->waiter);
- spin_unlock_irqrestore(&dio->bio_lock, flags);
}
}
@@ -284,6 +275,8 @@ static int dio_bio_complete(struct dio *
static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
{
struct dio *dio = bio->bi_private;
+ int waiter_holds_ref = 0;
+ int remaining;
if (bio->bi_size)
return 1;
@@ -291,7 +284,12 @@ static int dio_bio_end_aio(struct bio *b
/* cleanup the bio */
dio_bio_complete(dio, bio);
- if (atomic_dec_and_test(&dio->refcount))
+ waiter_holds_ref = !!dio->waiter;
+ remaining = atomic_sub_return(1, (&dio->refcount));
+ if (remaining == 1 && waiter_holds_ref)
+ wake_up_process(dio->waiter);
+
+ if (remaining == 0)
dio_complete_aio(dio);
return 0;
@@ -1089,30 +1087,15 @@ direct_io_worker(int rw, struct kiocb *i
if (ret == 0)
ret = dio->result;
+ if (should_wait)
+ dio_await_completion(dio);
+
/* this can free the dio */
if (atomic_dec_and_test(&dio->refcount))
dio_complete_aio(dio);
- if (should_wait) {
- unsigned long flags;
- /*
- * Wait for already issued I/O to drain out and
- * release its references to user-space pages
- * before returning to fallback on buffered I/O
- */
-
- spin_lock_irqsave(&dio->bio_lock, flags);
- set_current_state(TASK_UNINTERRUPTIBLE);
- while (atomic_read(&dio->refcount)) {
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- io_schedule();
- spin_lock_irqsave(&dio->bio_lock, flags);
- set_current_state(TASK_UNINTERRUPTIBLE);
- }
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- set_current_state(TASK_RUNNING);
+ if (should_wait)
kfree(dio);
- }
} else {
dio_await_completion(dio);
dio: call blk_run_address_space() once per op
We only need to call blk_run_address_space() once after all the bios for the
direct IO op have been submitted. This removes the chance of calling
blk_run_address_space() after spurious wake ups as the sync path waits for bios
to drain. It's also one less difference betwen the sync and async paths.
In the process we remove a redundant dio_bio_submit() that its caller had
already performed.
Signed-off-by: Zach Brown <[email protected]>
---
fs/direct-io.c | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
Index: 2.6.18-rc6-dio-cleanup/fs/direct-io.c
===================================================================
--- 2.6.18-rc6-dio-cleanup.orig/fs/direct-io.c
+++ 2.6.18-rc6-dio-cleanup/fs/direct-io.c
@@ -403,7 +403,6 @@ static struct bio *dio_await_one(struct
if (dio->bio_list == NULL) {
dio->waiter = current;
spin_unlock_irqrestore(&dio->bio_lock, flags);
- blk_run_address_space(dio->inode->i_mapping);
io_schedule();
spin_lock_irqsave(&dio->bio_lock, flags);
dio->waiter = NULL;
@@ -449,9 +448,6 @@ static int dio_bio_complete(struct dio *
*/
static void dio_await_completion(struct dio *dio)
{
- if (dio->bio)
- dio_bio_submit(dio);
-
/*
* The bio_lock is not held for the read of bio_count.
* This is ok since it is the dio_bio_complete() that changes
@@ -1077,6 +1073,9 @@ direct_io_worker(int rw, struct kiocb *i
if (dio->bio)
dio_bio_submit(dio);
+ /* All IO is now issued, send it on its way */
+ blk_run_address_space(inode->i_mapping);
+
/*
* It is possible that, we return short IO due to end of file.
* In that case, we need to release all the pages we got hold on.
@@ -1105,7 +1104,6 @@ direct_io_worker(int rw, struct kiocb *i
if (ret == 0)
ret = dio->result;
finished_one_bio(dio); /* This can free the dio */
- blk_run_address_space(inode->i_mapping);
if (should_wait) {
unsigned long flags;
/*
There was a 3/5 but the bogofilter decided it was spam. It made it into
the linux-aio archive:
http://marc.theaimsgroup.com/?l=linux-aio&m=115750084710650&w=2
What should I have done to avoid the spam regexes and what should I do
now that I have a patch that makes them angry?
- z
On Tue, Sep 05, 2006 at 09:35:37PM -0700, Zach Brown wrote:
>
> There was a 3/5 but the bogofilter decided it was spam. It made it into
> the linux-aio archive:
>
> http://marc.theaimsgroup.com/?l=linux-aio&m=115750084710650&w=2
>
> What should I have done to avoid the spam regexes and what should I do
> now that I have a patch that makes them angry?
I don't know. IMHO, bogofilter should only add a header so that people
who want to filter can and those who don't want to will get all the
messages. Spam has never been a real problem for me on LKML, but loss
of messages and patches will certainly be. I would rather have the choice
to not filter anything :-/
Willy
On Tue, 2006-09-05 at 21:35 -0700, Zach Brown wrote:
> What should I have done to avoid the spam regexes and what should I do
> now that I have a patch that makes them angry?
Use language similar to the naughty stuff that is getting through?
Seriously though, a quote from Matti's lkml announcement:
IF we take it into use, it will start rejecting messages
at SMTP input phase, so if it rejects legitimate message,
you should get a bounce from your email provider's system.
(Or from zeus.kernel.org, which is vger's backup MX.)
In such case, send the bounce with some explanations to
<[email protected]> -- emails to that address
are explicitely excluded from all filtering!
dio: formalize bio counters as a dio reference count
Previously we had two confusing counts of bio progress. 'bio_count' was
decremented as bios were processed and freed by the dio core. It was used to
indicate final completion of the dio operation. 'bios_in_flight' reflected how
many bios were between submit_bio() and bio->end_io. It was used by the sync
path to decide when to wake up and finish completing bios and was ignored
by the async path.
This patch collapses the two notions into one notion of a dio reference count.
bios hold a dio reference when they're between submit_bio and bio->end_io.
Since bios_in_flight was only used in the sync path it is now equivalent
to dio->refcount - 1 which accounts for direct_io_worker() holding a
reference for the duration of the operation.
dio_bio_complete() -> finished_one_bio() was called from the sync path after
finding bios on the list that the bio->end_io function had deposited.
finished_one_bio() can not drop the dio reference on behalf of these bios now
because bio->end_io already has. The is_async test in finished_one_bio() meant
that it never actually did anything other than drop the bio_count for sync
callers. So we remove its refcount decrement, don't call it from
dio_bio_complete(), and hoist its call up into the async dio_bio_complete()
caller after an explicit refcount decrement. It is renamed dio_complete_aio()
to reflect the remaining work it actually does.
Signed-off-by: Zach Brown <[email protected]>
---
fs/direct-io.c | 140 ++++++++++++++++++++++-------------------------
1 file changed, 66 insertions(+), 74 deletions(-)
Index: 2.6.18-rc6-dio-cleanup/fs/direct-io.c
===================================================================
--- 2.6.18-rc6-dio-cleanup.orig/fs/direct-io.c
+++ 2.6.18-rc6-dio-cleanup/fs/direct-io.c
@@ -120,9 +120,8 @@ struct dio {
int page_errors; /* errno from get_user_pages() */
/* BIO completion state */
+ atomic_t refcount; /* direct_io_worker() and bios */
spinlock_t bio_lock; /* protects BIO fields below */
- int bio_count; /* nr bios to be completed */
- int bios_in_flight; /* nr bios in flight */
struct bio *bio_list; /* singly linked via bi_private */
struct task_struct *waiter; /* waiting task (NULL if none) */
@@ -255,44 +254,27 @@ static int dio_complete(struct dio *dio,
* Called when a BIO has been processed. If the count goes to zero then IO is
* complete and we can signal this to the AIO layer.
*/
-static void finished_one_bio(struct dio *dio)
+static void dio_complete_aio(struct dio *dio)
{
unsigned long flags;
+ int ret;
- spin_lock_irqsave(&dio->bio_lock, flags);
- if (dio->bio_count == 1) {
- if (dio->is_async) {
- int ret;
-
- /*
- * Last reference to the dio is going away.
- * Drop spinlock and complete the DIO.
- */
- spin_unlock_irqrestore(&dio->bio_lock, flags);
-
- ret = dio_complete(dio, dio->iocb->ki_pos, 0);
+ ret = dio_complete(dio, dio->iocb->ki_pos, 0);
- /* Complete AIO later if falling back to buffered i/o */
- if (dio->result == dio->size ||
- ((dio->rw == READ) && dio->result)) {
- aio_complete(dio->iocb, ret, 0);
- kfree(dio);
- return;
- } else {
- /*
- * Falling back to buffered
- */
- spin_lock_irqsave(&dio->bio_lock, flags);
- dio->bio_count--;
- if (dio->waiter)
- wake_up_process(dio->waiter);
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- return;
- }
- }
+ /* Complete AIO later if falling back to buffered i/o */
+ if (dio->result == dio->size ||
+ ((dio->rw == READ) && dio->result)) {
+ aio_complete(dio->iocb, ret, 0);
+ kfree(dio);
+ } else {
+ /*
+ * Falling back to buffered
+ */
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ if (dio->waiter)
+ wake_up_process(dio->waiter);
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
}
- dio->bio_count--;
- spin_unlock_irqrestore(&dio->bio_lock, flags);
}
static int dio_bio_complete(struct dio *dio, struct bio *bio);
@@ -308,6 +290,10 @@ static int dio_bio_end_aio(struct bio *b
/* cleanup the bio */
dio_bio_complete(dio, bio);
+
+ if (atomic_dec_and_test(&dio->refcount))
+ dio_complete_aio(dio);
+
return 0;
}
@@ -329,8 +315,7 @@ static int dio_bio_end_io(struct bio *bi
spin_lock_irqsave(&dio->bio_lock, flags);
bio->bi_private = dio->bio_list;
dio->bio_list = bio;
- dio->bios_in_flight--;
- if (dio->waiter && dio->bios_in_flight == 0)
+ if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter)
wake_up_process(dio->waiter);
spin_unlock_irqrestore(&dio->bio_lock, flags);
return 0;
@@ -361,17 +346,15 @@ dio_bio_alloc(struct dio *dio, struct bl
* In the AIO read case we speculatively dirty the pages before starting IO.
* During IO completion, any of these pages which happen to have been written
* back will be redirtied by bio_check_pages_dirty().
+ *
+ * bios hold a dio reference between submit_bio and ->end_io.
*/
static void dio_bio_submit(struct dio *dio)
{
struct bio *bio = dio->bio;
- unsigned long flags;
bio->bi_private = dio;
- spin_lock_irqsave(&dio->bio_lock, flags);
- dio->bio_count++;
- dio->bios_in_flight++;
- spin_unlock_irqrestore(&dio->bio_lock, flags);
+ atomic_inc(&dio->refcount);
if (dio->is_async && dio->rw == READ)
bio_set_pages_dirty(bio);
submit_bio(dio->rw, bio);
@@ -389,18 +372,28 @@ static void dio_cleanup(struct dio *dio)
page_cache_release(dio_get_page(dio));
}
+static int wait_for_more_bios(struct dio *dio)
+{
+ assert_spin_locked(&dio->bio_lock);
+
+ return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL);
+}
+
/*
- * Wait for the next BIO to complete. Remove it and return it.
+ * Wait for the next BIO to complete. Remove it and return it. NULL is
+ * returned once all BIOs have been completed. This must only be called once
+ * all bios have been issued so that dio->refcount can only decrease. This
+ * requires that that the caller hold a reference on the dio.
*/
static struct bio *dio_await_one(struct dio *dio)
{
unsigned long flags;
- struct bio *bio;
+ struct bio *bio = NULL;
spin_lock_irqsave(&dio->bio_lock, flags);
- while (dio->bio_list == NULL) {
+ while (wait_for_more_bios(dio)) {
set_current_state(TASK_UNINTERRUPTIBLE);
- if (dio->bio_list == NULL) {
+ if (wait_for_more_bios(dio)) {
dio->waiter = current;
spin_unlock_irqrestore(&dio->bio_lock, flags);
io_schedule();
@@ -409,8 +402,10 @@ static struct bio *dio_await_one(struct
}
set_current_state(TASK_RUNNING);
}
- bio = dio->bio_list;
- dio->bio_list = bio->bi_private;
+ if (dio->bio_list) {
+ bio = dio->bio_list;
+ dio->bio_list = bio->bi_private;
+ }
spin_unlock_irqrestore(&dio->bio_lock, flags);
return bio;
}
@@ -439,25 +434,24 @@ static int dio_bio_complete(struct dio *
}
bio_put(bio);
}
- finished_one_bio(dio);
return uptodate ? 0 : -EIO;
}
/*
- * Wait on and process all in-flight BIOs.
+ * Wait on and process all in-flight BIOs. This must only be called once
+ * all bios have been issued so that the refcount can only decrease.
+ * This just waits for all bios to make it through dio_bio_complete. IO
+ * errors are propogated through dio->io_error and should be propogated via
+ * dio_complete().
*/
static void dio_await_completion(struct dio *dio)
{
- /*
- * The bio_lock is not held for the read of bio_count.
- * This is ok since it is the dio_bio_complete() that changes
- * bio_count.
- */
- while (dio->bio_count) {
- struct bio *bio = dio_await_one(dio);
- /* io errors are propogated through dio->io_error */
- dio_bio_complete(dio, bio);
- }
+ struct bio *bio;
+ do {
+ bio = dio_await_one(dio);
+ if (bio)
+ dio_bio_complete(dio, bio);
+ } while (bio);
}
/*
@@ -987,16 +981,7 @@ direct_io_worker(int rw, struct kiocb *i
dio->iocb = iocb;
dio->i_size = i_size_read(inode);
- /*
- * BIO completion state.
- *
- * ->bio_count starts out at one, and we decrement it to zero after all
- * BIOs are submitted. This to avoid the situation where a really fast
- * (or synchronous) device could take the count to zero while we're
- * still submitting BIOs.
- */
- dio->bio_count = 1;
- dio->bios_in_flight = 0;
+ atomic_set(&dio->refcount, 1);
spin_lock_init(&dio->bio_lock);
dio->bio_list = NULL;
dio->waiter = NULL;
@@ -1103,7 +1088,11 @@ direct_io_worker(int rw, struct kiocb *i
}
if (ret == 0)
ret = dio->result;
- finished_one_bio(dio); /* This can free the dio */
+
+ /* this can free the dio */
+ if (atomic_dec_and_test(&dio->refcount))
+ dio_complete_aio(dio);
+
if (should_wait) {
unsigned long flags;
/*
@@ -1114,7 +1103,7 @@ direct_io_worker(int rw, struct kiocb *i
spin_lock_irqsave(&dio->bio_lock, flags);
set_current_state(TASK_UNINTERRUPTIBLE);
- while (dio->bio_count) {
+ while (atomic_read(&dio->refcount)) {
spin_unlock_irqrestore(&dio->bio_lock, flags);
io_schedule();
spin_lock_irqsave(&dio->bio_lock, flags);
@@ -1125,7 +1114,6 @@ direct_io_worker(int rw, struct kiocb *i
kfree(dio);
}
} else {
- finished_one_bio(dio);
dio_await_completion(dio);
ret = dio_complete(dio, offset, ret);
@@ -1138,7 +1126,11 @@ direct_io_worker(int rw, struct kiocb *i
* i/o, we have to mark the the aio complete.
*/
aio_complete(iocb, ret, 0);
- kfree(dio);
+
+ if (atomic_dec_and_test(&dio->refcount))
+ kfree(dio);
+ else
+ BUG();
}
return ret;
}
On Tue, Sep 05, 2006 at 04:57:32PM -0700, Zach Brown wrote:
> There have been a lot of bugs recently due to the way direct_io_worker() tries
> to decide how to finish direct IO operations. In the worst examples it has
> failed to call aio_complete() at all (hang) or called it too many times (oops).
>
> This set of patches cleans up the completion phase with the goal of removing
> the complexity that lead to these bugs. We end up with one path that
> calculates the result of the operation after all off the bios have completed.
> We decide when to generate a result of the operation using that path based on
> the final release of a refcount on the dio structure.
Very nice piece of work ! Thanks for taking this up :)
I have looked through all the patches and the completion path unification
logic looks sound to me (btw, the explanations and comments are very good too).
Not the usual bandaids that we often end up with, but a real cleanup that
should go some way in making at least the most errorprone part of the DIO
code more maintainable (I hope/wish we could also do something similar with
simplifying the locking as well).
Of course with this code, we have to await rigorous testing
... and more reviews, but please consider this as my ack for the approach.
Regards
Suparna
>
> I tried to progress towards the final state in steps that were relatively easy
> to understand. Each step should compile but I only tested the final result of
> having all the patches applied.
>
> The patches result in a slight net decrease in code and binary size:
>
> 2.6.18-rc4-dio-cleanup/fs/direct-io.c | 8
> 2.6.18-rc5-dio-cleanup/fs/direct-io.c | 94 +++++------
> 2.6.18-rc5-dio-cleanup/mm/filemap.c | 4
> fs/direct-io.c | 273 ++++++++++++++--------------------
> 4 files changed, 159 insertions(+), 220 deletions(-)
>
> text data bss dec hex filename
> 2592385 450996 210296 3253677 31a5ad vmlinux.before
> 2592113 450980 210296 3253389 31a48d vmlinux.after
>
> The patches pass light testing with aio-stress, the direct IO tests I could
> manage to get running in LTP, and some home-brew functional tests. It's still
> making its way through stress testing. It should not be merged until we hear
> from that more rigorous testing, I don't think.
>
> I hoped to get some feedback (and maybe volunteers for testing!) by sending the
> patches out before waiting for the stress tests.
>
> - z
>
> --
> To unsubscribe, send a message with 'unsubscribe linux-aio' in
> the body to [email protected]. For more info on Linux AIO,
> see: http://www.kvack.org/aio/
> Don't email: <a href=mailto:"[email protected]">[email protected]</a>
--
Suparna Bhattacharya ([email protected])
Linux Technology Center
IBM Software Lab, India
==> Regarding [RFC 0/5] dio: clean up completion phase of direct_io_worker(); Zach Brown <[email protected]> adds:
zach.brown> There have been a lot of bugs recently due to the way
zach.brown> direct_io_worker() tries to decide how to finish direct IO
zach.brown> operations. In the worst examples it has failed to call
zach.brown> aio_complete() at all (hang) or called it too many times
zach.brown> (oops).
zach.brown> This set of patches cleans up the completion phase with the
zach.brown> goal of removing the complexity that lead to these bugs. We
zach.brown> end up with one path that calculates the result of the
zach.brown> operation after all off the bios have completed. We decide
zach.brown> when to generate a result of the operation using that path
zach.brown> based on the final release of a refcount on the dio structure.
[...]
zach.brown> I hoped to get some feedback (and maybe volunteers for
zach.brown> testing!) by sending the patches out before waiting for the
zach.brown> stress tests.
This all looks good, the code is much easier to follow. What do you think
about making dio->result an unsigned quantity? It should never be negative
now that there is an io_error field.
ACK.
Jeff
> code more maintainable (I hope/wish we could also do something similar with
> simplifying the locking as well).
I agree, and that is definitely on my medium-term todo list.
> Of course with this code, we have to await rigorous testing
> ... and more reviews, but please consider this as my ack for the approach.
Yeah, absolutely. Is there a chance that IBM can throw some testing
cycles at it? I have it queued up for some moderately sized DB runs
over here (FC arrays, cable pulling, that kind of thing.)
- z
> This all looks good, the code is much easier to follow. What do you think
> about making dio->result an unsigned quantity? It should never be negative
> now that there is an io_error field.
Yeah, that has always bugged me too. I considered renaming it 'issued',
or something, as part of this patchset but thought we could do it later.
While we're on this topic, I'm nervious that we increment it when
do_direct_IO fails. It might be sound, but that we consider it the
amount of work "transferred" for dio->end_io makes me want to make sure
there aren't confusing corner cases here.
- z
==> Regarding Re: [RFC 0/5] dio: clean up completion phase of direct_io_worker(); Zach Brown <[email protected]> adds:
>> This all looks good, the code is much easier to follow. What do you think
>> about making dio->result an unsigned quantity? It should never be negative
>> now that there is an io_error field.
zach.brown> Yeah, that has always bugged me too. I considered renaming it
zach.brown> 'issued', or something, as part of this patchset but thought we
zach.brown> could do it later.
I figured since you were doing some house-keeping, we might as well clean
up as much as possible. It's up to you, though. ;)
zach.brown> While we're on this topic, I'm nervious that we increment it
zach.brown> when do_direct_IO fails. It might be sound, but that we
zach.brown> consider it the amount of work "transferred" for dio->end_io
zach.brown> makes me want to make sure there aren't confusing corner cases
zach.brown> here.
It does look non-obvious when reading the code. However, I'm pretty sure
it's right. dio->block_in_file is only updated if there is no error
returned from submit_page_section. As such, it really does reflect how
much work was done before the error, right? It does seem odd that we do
this math in two separate places, though.
-Jeff
On Tue, 05 Sep 2006, Zach Brown wrote:
> There was a 3/5 but the bogofilter decided it was spam.
Certainly not. The training data you fed into bogofilter data made that
message look like spam. -- Yes, that's a difference.
--
Matthias Andree
Hello ,
I applied the DIO patches and built the kernel 2.6.18-rc6
(kernel.org).
And executed Aio-DioStressTest of LTP testsuite( ltp-full-20060822 )
on EXT2, EXT3 and XFS filesystems. For the EXT2 and EXT3 filesystems the
tests went okay. But I got stack trace on XFS filesystem and the machine
went down.
kernel BUG at kernel/workqueue.c:113!
invalid opcode: 0000 [#1]
PREEMPT SMP DEBUG_PAGEALLOC
Modules linked in:
CPU: 2
EIP: 0060:[<c012df03>] Not tainted VLI
EFLAGS: 00010202 (2.6.18-rc6-dio #1)
EIP is at queue_work+0x86/0x90
eax: f7900780 ebx: f790077c ecx: f7900754 edx: 00000002
esi: c5f4f8e0 edi: 00000000 ebp: c5d63ca8 esp: c5d63c94
ds: 007b es: 007b ss: 0068
Process swapper (pid: 0, ti=c5d62000 task=c5d2b030 task.ti=c5d62000)
Stack: f268f180 c5d63cb4 3e39c000 00000000 00010000 c5d63cb0 c02b43a2
c5d63cc8
c02b5e2f f7900754 00000000 3e39c000 f4e90000 c5d63d04 c018e77a
f3235780
3e39c000 00000000 00010000 f2604b20 f268f180 c5d63d04 00010000
3e39c000
Call Trace:
[<c0103cea>] show_stack_log_lvl+0xcc/0xdc
[<c0103f0f>] show_registers+0x1b7/0x22b
[<c0104143>] die+0x139/0x235
[<c01042bd>] do_trap+0x7e/0xb4
[<c01045f3>] do_invalid_op+0xb5/0xbf
[<c0103945>] error_code+0x39/0x40
[<c02b43a2>] xfs_finish_ioend+0x20/0x22
[<c02b5e2f>] xfs_end_io_direct+0x3c/0x68
[<c018e77a>] dio_complete+0xe3/0xfe
[<c018e82d>] dio_bio_end_aio+0x98/0xb1
[<c016e889>] bio_endio+0x4e/0x78
[<c02cdc89>] __end_that_request_first+0xcd/0x416
[<c02ce015>] end_that_request_chunk+0x1f/0x21
[<c0380442>] scsi_end_request+0x2d/0xe8
[<c0380715>] scsi_io_completion+0x10c/0x409
[<c03a986b>] sd_rw_intr+0x188/0x2c6
[<c037b832>] scsi_finish_command+0x4e/0x96
[<c0380f44>] scsi_softirq_done+0xaa/0x10b
[<c02ce073>] blk_done_softirq+0x5c/0x6a
[<c01227a4>] __do_softirq+0x6d/0xe3
[<c0122858>] do_softirq+0x3e/0x40
[<c01228a1>] irq_exit+0x47/0x49
[<c01054ef>] do_IRQ+0x2f/0x5d
[<c0103826>] common_interrupt+0x1a/0x20
[<c0100d84>] cpu_idle+0x9a/0xb0
[<c010e077>] start_secondary+0xeb/0x32c
[<00000000>] 0x0
[<c5d63fb4>] 0xc5d63fb4
Code: ff ff b8 01 00 00 00 e8 87 9a fe ff 89 e0 25 00 e0 ff ff 8b 40 08
a8 08 75 0a 83 c4 08 89 f8 5b 5e 5f 5d c3 e8 2f 0f 3
EIP: [<c012df03>] queue_work+0x86/0x90 SS:ESP 0068:c5d63c94
<0>Kernel panic - not syncing: Fatal exception in interrupt
Also I executed some of the tests from the
http: // developer.osdl. org/daniel/AIO/TESTS/ and it went fine.
For further testing, I am planning to put stress workload on DB2 by
enabling AIO-DIO features of DB2. And for error path testing, I am
contemplating to use kprobe to inject the IO errors.
Will let you know the progress as it happens.
Regards
Veerendra C
LTC-ISL, IBM.
> on EXT2, EXT3 and XFS filesystems. For the EXT2 and EXT3 filesystems the
> tests went okay. But I got stack trace on XFS filesystem and the machine
> went down.
Fantastic, thanks for running these tests.
> kernel BUG at kernel/workqueue.c:113!
> EIP is at queue_work+0x86/0x90
We were able to set the pending bit but then found that list_empty()
failed on the work queue's entry list_head. Let's call this memory
corruption of some kind.
> [<c02b43a2>] xfs_finish_ioend+0x20/0x22
> [<c02b5e2f>] xfs_end_io_direct+0x3c/0x68
> [<c018e77a>] dio_complete+0xe3/0xfe
> [<c018e82d>] dio_bio_end_aio+0x98/0xb1
> [<c016e889>] bio_endio+0x4e/0x78
> [<c02cdc89>] __end_that_request_first+0xcd/0x416
It was completing an AIO request.
ret = blockdev_direct_IO_own_locking(rw, iocb, inode,
iomap.iomap_target->bt_bdev,
iov, offset, nr_segs,
xfs_get_blocks_direct,
xfs_end_io_direct);
if (unlikely(ret <= 0 && iocb->private))
xfs_destroy_ioend(iocb->private);
It looks like xfs_vm_direct_io() is destroying the ioend in the case
where direct IO is returning -EIOCBQUEUED. Later the AIO will complete
and try to call queue_work on the freed ioend. This wasn't a problem
before when blkdev_direct_IO_*() would just return the number of bytes
in the op that was in flight. That test should be
if (unlikely(ret != -EIOCBQUEUED && iocb->private))
I'll update the patch set and send it out.
This makes me worry that XFS might have other paths that need to know
about the magical -EIOCBQUEUED case which actually means that a AIO DIO
is in flight.
Could I coerce some XFS guys into investigating if we might have other
problems with trying to bubble -EIOCBQUEUED up from
blockdev_direct_IO_own_locking() up through to xfs_file_aio_write()'s
caller before calling xfs_end_io_direct()?
- z