2014-07-23 22:56:09

by Ming Lei

[permalink] [raw]
Subject: [PATCH 0/9] block/aio: loop mq conversion and kernel aio

Hi,

The first 6 patches convert current loop driver into blk-mq, and
loop's scalability can get improved a lot.

The 7th and 8th patches introduce kernel AIO support, most of
is borrow from Dave's work last year, and thanks to ITER_BVEC,
now it is easier to implement kernel AIO now.

The 9th patch uses kernel AIO with O_DIRECT to improve loop's
performance in single thread.

With blk-mq and kernel AIO, both scalability and performance
of loop driver get lot of improvement.

Thanks,


2014-07-23 22:56:38

by Ming Lei

[permalink] [raw]
Subject: [PATCH 1/9] blk-mq: export blk_mq_freeze_queue and blk_mq_unfreeze_queue

It is handy to use the two helpers for switching backend file
in loop driver, so export them.

Signed-off-by: Ming Lei <[email protected]>
---
block/blk-mq.c | 4 +++-
block/blk-mq.h | 1 -
include/linux/blk-mq.h | 2 ++
3 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/block/blk-mq.c b/block/blk-mq.c
index 5189cb1..07e1033 100644
--- a/block/blk-mq.c
+++ b/block/blk-mq.c
@@ -120,8 +120,9 @@ void blk_mq_freeze_queue(struct request_queue *q)
blk_mq_run_queues(q, false);
wait_event(q->mq_freeze_wq, percpu_ref_is_zero(&q->mq_usage_counter));
}
+EXPORT_SYMBOL(blk_mq_freeze_queue);

-static void blk_mq_unfreeze_queue(struct request_queue *q)
+void blk_mq_unfreeze_queue(struct request_queue *q)
{
bool wake = false;

@@ -134,6 +135,7 @@ static void blk_mq_unfreeze_queue(struct request_queue *q)
wake_up_all(&q->mq_freeze_wq);
}
}
+EXPORT_SYMBOL(blk_mq_unfreeze_queue);

bool blk_mq_can_queue(struct blk_mq_hw_ctx *hctx)
{
diff --git a/block/blk-mq.h b/block/blk-mq.h
index ca4964a..3800316 100644
--- a/block/blk-mq.h
+++ b/block/blk-mq.h
@@ -28,7 +28,6 @@ struct blk_mq_ctx {
void __blk_mq_complete_request(struct request *rq);
void blk_mq_run_hw_queue(struct blk_mq_hw_ctx *hctx, bool async);
void blk_mq_init_flush(struct request_queue *q);
-void blk_mq_freeze_queue(struct request_queue *q);
void blk_mq_free_queue(struct request_queue *q);
void blk_mq_clone_flush_request(struct request *flush_rq,
struct request *orig_rq);
diff --git a/include/linux/blk-mq.h b/include/linux/blk-mq.h
index eb726b9..1cc1baa 100644
--- a/include/linux/blk-mq.h
+++ b/include/linux/blk-mq.h
@@ -175,6 +175,8 @@ void blk_mq_start_hw_queues(struct request_queue *q);
void blk_mq_start_stopped_hw_queues(struct request_queue *q, bool async);
void blk_mq_delay_queue(struct blk_mq_hw_ctx *hctx, unsigned long msecs);
void blk_mq_tag_busy_iter(struct blk_mq_tags *tags, void (*fn)(void *data, unsigned long *), void *data);
+void blk_mq_freeze_queue(struct request_queue *q);
+void blk_mq_unfreeze_queue(struct request_queue *q);

/*
* Driver command data is immediately after the request. So subtract request
--
1.7.9.5

2014-07-23 22:57:17

by Ming Lei

[permalink] [raw]
Subject: [PATCH 2/9] blk-mq: introduce init_flush_rq_fn callback in 'blk_mq_ops'

Currently pdu of the flush rq is simlpy copied from another rq,
it isn't enough to initialize pointer field well, so introduce
the callback for driver to handle the case easily.

Signed-off-by: Ming Lei <[email protected]>
---
block/blk-mq.c | 7 +++++++
include/linux/blk-mq.h | 11 +++++++++++
2 files changed, 18 insertions(+)

diff --git a/block/blk-mq.c b/block/blk-mq.c
index 07e1033..295cfc8d 100644
--- a/block/blk-mq.c
+++ b/block/blk-mq.c
@@ -285,11 +285,18 @@ void blk_mq_clone_flush_request(struct request *flush_rq,
{
struct blk_mq_hw_ctx *hctx =
orig_rq->q->mq_ops->map_queue(orig_rq->q, orig_rq->mq_ctx->cpu);
+ int ret = 0;

flush_rq->mq_ctx = orig_rq->mq_ctx;
flush_rq->tag = orig_rq->tag;
memcpy(blk_mq_rq_to_pdu(flush_rq), blk_mq_rq_to_pdu(orig_rq),
hctx->cmd_size);
+
+ if (orig_rq->q->mq_ops->init_flush_rq)
+ ret = orig_rq->q->mq_ops->init_flush_rq(
+ orig_rq->q->tag_set->driver_data,
+ orig_rq->q, flush_rq, orig_rq);
+ WARN_ON(ret);
}

inline void __blk_mq_end_io(struct request *rq, int error)
diff --git a/include/linux/blk-mq.h b/include/linux/blk-mq.h
index 1cc1baa..df000d4 100644
--- a/include/linux/blk-mq.h
+++ b/include/linux/blk-mq.h
@@ -85,6 +85,8 @@ typedef int (init_request_fn)(void *, struct request *, unsigned int,
unsigned int, unsigned int);
typedef void (exit_request_fn)(void *, struct request *, unsigned int,
unsigned int);
+typedef int (init_flush_rq_fn)(void *, struct request_queue *,
+ struct request *, const struct request *);

struct blk_mq_ops {
/*
@@ -119,6 +121,15 @@ struct blk_mq_ops {
*/
init_request_fn *init_request;
exit_request_fn *exit_request;
+
+ /*
+ * Called after the flush rq is cloned, pointer can't be
+ * initialized well with simply memcpy(), so driver can
+ * use the callback to reinitialize flush rq. The 3rd
+ * parameter is the flush rq which has been cloned/copied
+ * from another rq(the 4th parameter).
+ */
+ init_flush_rq_fn *init_flush_rq;
};

enum {
--
1.7.9.5

2014-07-23 22:57:39

by Ming Lei

[permalink] [raw]
Subject: [PATCH 3/9] block: loop: convert to blk-mq

The conversion is a bit straightforward, and use work queue to
dispatch reqests of loop block, so scalability gets improved a
lot, and also thoughput is increased a lot in case of concurrent
I/O requests.

Another benefit is that loop driver code gets simplified
much, and the patch can be thought as cleanup too.

Signed-off-by: Ming Lei <[email protected]>
---
drivers/block/loop.c | 294 ++++++++++++++++++++++----------------------------
drivers/block/loop.h | 14 +--
2 files changed, 137 insertions(+), 171 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 6cb1beb..1af5265 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -75,6 +75,7 @@
#include <linux/sysfs.h>
#include <linux/miscdevice.h>
#include <linux/falloc.h>
+#include <linux/blk-mq.h>
#include "loop.h"

#include <asm/uaccess.h>
@@ -85,6 +86,8 @@ static DEFINE_MUTEX(loop_index_mutex);
static int max_part;
static int part_shift;

+static struct workqueue_struct *loop_wq;
+
/*
* Transfer functions
*/
@@ -466,109 +469,37 @@ out:
return ret;
}

-/*
- * Add bio to back of pending list
- */
-static void loop_add_bio(struct loop_device *lo, struct bio *bio)
-{
- lo->lo_bio_count++;
- bio_list_add(&lo->lo_bio_list, bio);
-}
-
-/*
- * Grab first pending buffer
- */
-static struct bio *loop_get_bio(struct loop_device *lo)
-{
- lo->lo_bio_count--;
- return bio_list_pop(&lo->lo_bio_list);
-}
-
-static void loop_make_request(struct request_queue *q, struct bio *old_bio)
-{
- struct loop_device *lo = q->queuedata;
- int rw = bio_rw(old_bio);
-
- if (rw == READA)
- rw = READ;
-
- BUG_ON(!lo || (rw != READ && rw != WRITE));
-
- spin_lock_irq(&lo->lo_lock);
- if (lo->lo_state != Lo_bound)
- goto out;
- if (unlikely(rw == WRITE && (lo->lo_flags & LO_FLAGS_READ_ONLY)))
- goto out;
- if (lo->lo_bio_count >= q->nr_congestion_on)
- wait_event_lock_irq(lo->lo_req_wait,
- lo->lo_bio_count < q->nr_congestion_off,
- lo->lo_lock);
- loop_add_bio(lo, old_bio);
- wake_up(&lo->lo_event);
- spin_unlock_irq(&lo->lo_lock);
- return;
-
-out:
- spin_unlock_irq(&lo->lo_lock);
- bio_io_error(old_bio);
-}
-
struct switch_request {
struct file *file;
struct completion wait;
};

-static void do_loop_switch(struct loop_device *, struct switch_request *);
-
-static inline void loop_handle_bio(struct loop_device *lo, struct bio *bio)
+static inline int loop_handle_bio(struct loop_device *lo, struct bio *bio)
{
- if (unlikely(!bio->bi_bdev)) {
- do_loop_switch(lo, bio->bi_private);
- bio_put(bio);
- } else {
- int ret = do_bio_filebacked(lo, bio);
- bio_endio(bio, ret);
- }
+ int ret = do_bio_filebacked(lo, bio);
+ return ret;
}

/*
- * worker thread that handles reads/writes to file backed loop devices,
- * to avoid blocking in our make_request_fn. it also does loop decrypting
- * on reads for block backed loop, as that is too heavy to do from
- * b_end_io context where irqs may be disabled.
- *
- * Loop explanation: loop_clr_fd() sets lo_state to Lo_rundown before
- * calling kthread_stop(). Therefore once kthread_should_stop() is
- * true, make_request will not place any more requests. Therefore
- * once kthread_should_stop() is true and lo_bio is NULL, we are
- * done with the loop.
+ * Do the actual switch; called from the BIO completion routine
*/
-static int loop_thread(void *data)
+static void do_loop_switch(struct loop_device *lo, struct switch_request *p)
{
- struct loop_device *lo = data;
- struct bio *bio;
-
- set_user_nice(current, MIN_NICE);
-
- while (!kthread_should_stop() || !bio_list_empty(&lo->lo_bio_list)) {
-
- wait_event_interruptible(lo->lo_event,
- !bio_list_empty(&lo->lo_bio_list) ||
- kthread_should_stop());
-
- if (bio_list_empty(&lo->lo_bio_list))
- continue;
- spin_lock_irq(&lo->lo_lock);
- bio = loop_get_bio(lo);
- if (lo->lo_bio_count < lo->lo_queue->nr_congestion_off)
- wake_up(&lo->lo_req_wait);
- spin_unlock_irq(&lo->lo_lock);
+ struct file *file = p->file;
+ struct file *old_file = lo->lo_backing_file;
+ struct address_space *mapping;

- BUG_ON(!bio);
- loop_handle_bio(lo, bio);
- }
+ /* if no new file, only flush of queued bios requested */
+ if (!file)
+ return;

- return 0;
+ mapping = file->f_mapping;
+ mapping_set_gfp_mask(old_file->f_mapping, lo->old_gfp_mask);
+ lo->lo_backing_file = file;
+ lo->lo_blocksize = S_ISBLK(mapping->host->i_mode) ?
+ mapping->host->i_bdev->bd_block_size : PAGE_SIZE;
+ lo->old_gfp_mask = mapping_gfp_mask(mapping);
+ mapping_set_gfp_mask(mapping, lo->old_gfp_mask & ~(__GFP_IO|__GFP_FS));
}

/*
@@ -579,15 +510,18 @@ static int loop_thread(void *data)
static int loop_switch(struct loop_device *lo, struct file *file)
{
struct switch_request w;
- struct bio *bio = bio_alloc(GFP_KERNEL, 0);
- if (!bio)
- return -ENOMEM;
- init_completion(&w.wait);
+
w.file = file;
- bio->bi_private = &w;
- bio->bi_bdev = NULL;
- loop_make_request(lo->lo_queue, bio);
- wait_for_completion(&w.wait);
+
+ /* freeze queue and wait for completion of scheduled requests */
+ blk_mq_freeze_queue(lo->lo_queue);
+
+ /* do the switch action */
+ do_loop_switch(lo, &w);
+
+ /* unfreeze */
+ blk_mq_unfreeze_queue(lo->lo_queue);
+
return 0;
}

@@ -596,39 +530,10 @@ static int loop_switch(struct loop_device *lo, struct file *file)
*/
static int loop_flush(struct loop_device *lo)
{
- /* loop not yet configured, no running thread, nothing to flush */
- if (!lo->lo_thread)
- return 0;
-
return loop_switch(lo, NULL);
}

/*
- * Do the actual switch; called from the BIO completion routine
- */
-static void do_loop_switch(struct loop_device *lo, struct switch_request *p)
-{
- struct file *file = p->file;
- struct file *old_file = lo->lo_backing_file;
- struct address_space *mapping;
-
- /* if no new file, only flush of queued bios requested */
- if (!file)
- goto out;
-
- mapping = file->f_mapping;
- mapping_set_gfp_mask(old_file->f_mapping, lo->old_gfp_mask);
- lo->lo_backing_file = file;
- lo->lo_blocksize = S_ISBLK(mapping->host->i_mode) ?
- mapping->host->i_bdev->bd_block_size : PAGE_SIZE;
- lo->old_gfp_mask = mapping_gfp_mask(mapping);
- mapping_set_gfp_mask(mapping, lo->old_gfp_mask & ~(__GFP_IO|__GFP_FS));
-out:
- complete(&p->wait);
-}
-
-
-/*
* loop_change_fd switched the backing store of a loopback device to
* a new file. This is useful for operating system installers to free up
* the original file and in High Availability environments to switch to
@@ -889,12 +794,9 @@ static int loop_set_fd(struct loop_device *lo, fmode_t mode,
lo->transfer = transfer_none;
lo->ioctl = NULL;
lo->lo_sizelimit = 0;
- lo->lo_bio_count = 0;
lo->old_gfp_mask = mapping_gfp_mask(mapping);
mapping_set_gfp_mask(mapping, lo->old_gfp_mask & ~(__GFP_IO|__GFP_FS));

- bio_list_init(&lo->lo_bio_list);
-
if (!(lo_flags & LO_FLAGS_READ_ONLY) && file->f_op->fsync)
blk_queue_flush(lo->lo_queue, REQ_FLUSH);

@@ -906,14 +808,7 @@ static int loop_set_fd(struct loop_device *lo, fmode_t mode,

set_blocksize(bdev, lo_blocksize);

- lo->lo_thread = kthread_create(loop_thread, lo, "loop%d",
- lo->lo_number);
- if (IS_ERR(lo->lo_thread)) {
- error = PTR_ERR(lo->lo_thread);
- goto out_clr;
- }
lo->lo_state = Lo_bound;
- wake_up_process(lo->lo_thread);
if (part_shift)
lo->lo_flags |= LO_FLAGS_PARTSCAN;
if (lo->lo_flags & LO_FLAGS_PARTSCAN)
@@ -925,18 +820,6 @@ static int loop_set_fd(struct loop_device *lo, fmode_t mode,
bdgrab(bdev);
return 0;

-out_clr:
- loop_sysfs_exit(lo);
- lo->lo_thread = NULL;
- lo->lo_device = NULL;
- lo->lo_backing_file = NULL;
- lo->lo_flags = 0;
- set_capacity(lo->lo_disk, 0);
- invalidate_bdev(bdev);
- bd_set_size(bdev, 0);
- kobject_uevent(&disk_to_dev(bdev->bd_disk)->kobj, KOBJ_CHANGE);
- mapping_set_gfp_mask(mapping, lo->old_gfp_mask);
- lo->lo_state = Lo_unbound;
out_putf:
fput(file);
out:
@@ -1012,11 +895,6 @@ static int loop_clr_fd(struct loop_device *lo)

spin_lock_irq(&lo->lo_lock);
lo->lo_state = Lo_rundown;
- spin_unlock_irq(&lo->lo_lock);
-
- kthread_stop(lo->lo_thread);
-
- spin_lock_irq(&lo->lo_lock);
lo->lo_backing_file = NULL;
spin_unlock_irq(&lo->lo_lock);

@@ -1028,7 +906,6 @@ static int loop_clr_fd(struct loop_device *lo)
lo->lo_offset = 0;
lo->lo_sizelimit = 0;
lo->lo_encrypt_key_size = 0;
- lo->lo_thread = NULL;
memset(lo->lo_encrypt_key, 0, LO_KEY_SIZE);
memset(lo->lo_crypt_name, 0, LO_NAME_SIZE);
memset(lo->lo_file_name, 0, LO_NAME_SIZE);
@@ -1601,6 +1478,84 @@ int loop_unregister_transfer(int number)
EXPORT_SYMBOL(loop_register_transfer);
EXPORT_SYMBOL(loop_unregister_transfer);

+static int loop_queue_rq(struct blk_mq_hw_ctx *hctx, struct request *rq)
+{
+ struct loop_cmd *cmd = blk_mq_rq_to_pdu(rq);
+
+ queue_work(loop_wq, &cmd->work);
+ return BLK_MQ_RQ_QUEUE_OK;
+}
+
+static int loop_init_hctx(struct blk_mq_hw_ctx *hctx, void *data,
+ unsigned int index)
+{
+ struct loop_device *lo = data;
+
+ hctx->driver_data = lo;
+ return 0;
+}
+
+static void loop_softirq_done_fn(struct request *rq)
+{
+ blk_mq_end_io(rq, rq->errors);
+}
+
+static void loop_queue_work(struct work_struct *work)
+{
+ struct loop_cmd *cmd =
+ container_of(work, struct loop_cmd, work);
+ const bool write = cmd->rq->cmd_flags & REQ_WRITE;
+ struct loop_device *lo = cmd->lo;
+ int ret = -EIO;
+ struct bio *bio;
+
+ if (lo->lo_state != Lo_bound)
+ goto failed;
+
+ if (write && (lo->lo_flags & LO_FLAGS_READ_ONLY))
+ goto failed;
+
+ ret = 0;
+ __rq_for_each_bio(bio, cmd->rq)
+ ret |= loop_handle_bio(lo, bio);
+
+ failed:
+ if (ret)
+ cmd->rq->errors = -EIO;
+ blk_mq_complete_request(cmd->rq);
+}
+
+static int loop_init_request(void *data, struct request *rq,
+ unsigned int hctx_idx, unsigned int request_idx,
+ unsigned int numa_node)
+{
+ struct loop_cmd *cmd = blk_mq_rq_to_pdu(rq);
+
+ cmd->rq = rq;
+ cmd->lo = data;
+ INIT_WORK(&cmd->work, loop_queue_work);
+
+ return 0;
+}
+
+static int loop_init_flush_rq(void *data, struct request_queue *q,
+ struct request *flush_rq,
+ const struct request *src_rq)
+{
+ /* borrow initialization helper for common rq */
+ loop_init_request(data, flush_rq, 0, -1, NUMA_NO_NODE);
+ return 0;
+}
+
+static struct blk_mq_ops loop_mq_ops = {
+ .queue_rq = loop_queue_rq,
+ .map_queue = blk_mq_map_queue,
+ .init_hctx = loop_init_hctx,
+ .init_request = loop_init_request,
+ .init_flush_rq = loop_init_flush_rq,
+ .complete = loop_softirq_done_fn,
+};
+
static int loop_add(struct loop_device **l, int i)
{
struct loop_device *lo;
@@ -1627,15 +1582,20 @@ static int loop_add(struct loop_device **l, int i)
i = err;

err = -ENOMEM;
- lo->lo_queue = blk_alloc_queue(GFP_KERNEL);
- if (!lo->lo_queue)
+ lo->tag_set.ops = &loop_mq_ops;
+ lo->tag_set.nr_hw_queues = 1;
+ lo->tag_set.queue_depth = 128;
+ lo->tag_set.numa_node = NUMA_NO_NODE;
+ lo->tag_set.cmd_size = sizeof(struct loop_cmd);
+ lo->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
+ lo->tag_set.driver_data = lo;
+
+ if (blk_mq_alloc_tag_set(&lo->tag_set))
goto out_free_idr;

- /*
- * set queue make_request_fn
- */
- blk_queue_make_request(lo->lo_queue, loop_make_request);
- lo->lo_queue->queuedata = lo;
+ lo->lo_queue = blk_mq_init_queue(&lo->tag_set);
+ if (!lo->lo_queue)
+ goto out_cleanup_tags;

disk = lo->lo_disk = alloc_disk(1 << part_shift);
if (!disk)
@@ -1664,9 +1624,6 @@ static int loop_add(struct loop_device **l, int i)
disk->flags |= GENHD_FL_EXT_DEVT;
mutex_init(&lo->lo_ctl_mutex);
lo->lo_number = i;
- lo->lo_thread = NULL;
- init_waitqueue_head(&lo->lo_event);
- init_waitqueue_head(&lo->lo_req_wait);
spin_lock_init(&lo->lo_lock);
disk->major = LOOP_MAJOR;
disk->first_minor = i << part_shift;
@@ -1680,6 +1637,8 @@ static int loop_add(struct loop_device **l, int i)

out_free_queue:
blk_cleanup_queue(lo->lo_queue);
+out_cleanup_tags:
+ blk_mq_free_tag_set(&lo->tag_set);
out_free_idr:
idr_remove(&loop_index_idr, i);
out_free_dev:
@@ -1692,6 +1651,7 @@ static void loop_remove(struct loop_device *lo)
{
del_gendisk(lo->lo_disk);
blk_cleanup_queue(lo->lo_queue);
+ blk_mq_free_tag_set(&lo->tag_set);
put_disk(lo->lo_disk);
kfree(lo);
}
@@ -1884,6 +1844,10 @@ static int __init loop_init(void)
loop_add(&lo, i);
mutex_unlock(&loop_index_mutex);

+ loop_wq = alloc_workqueue("kloopd", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
+ if (!loop_wq)
+ panic("Failed to create kloopd\n");
+
printk(KERN_INFO "loop: module loaded\n");
return 0;

diff --git a/drivers/block/loop.h b/drivers/block/loop.h
index 90df5d6..be796c7 100644
--- a/drivers/block/loop.h
+++ b/drivers/block/loop.h
@@ -13,6 +13,7 @@
#include <linux/blkdev.h>
#include <linux/spinlock.h>
#include <linux/mutex.h>
+#include <linux/workqueue.h>
#include <uapi/linux/loop.h>

/* Possible states of device */
@@ -52,19 +53,20 @@ struct loop_device {
gfp_t old_gfp_mask;

spinlock_t lo_lock;
- struct bio_list lo_bio_list;
- unsigned int lo_bio_count;
int lo_state;
struct mutex lo_ctl_mutex;
- struct task_struct *lo_thread;
- wait_queue_head_t lo_event;
- /* wait queue for incoming requests */
- wait_queue_head_t lo_req_wait;

struct request_queue *lo_queue;
+ struct blk_mq_tag_set tag_set;
struct gendisk *lo_disk;
};

+struct loop_cmd {
+ struct work_struct work;
+ struct request *rq;
+ struct loop_device *lo;
+};
+
/* Support for loadable transfer modules */
struct loop_func_table {
int number; /* filter type */
--
1.7.9.5

2014-07-23 22:57:58

by Ming Lei

[permalink] [raw]
Subject: [PATCH 4/9] block: loop: say goodby to bio

Switch to block request completely.

Signed-off-by: Ming Lei <[email protected]>
---
drivers/block/loop.c | 46 ++++++++++++++++++++--------------------------
1 file changed, 20 insertions(+), 26 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 1af5265..0729c3d 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -287,12 +287,12 @@ static int do_lo_send_write(struct loop_device *lo, struct bio_vec *bvec,
return ret;
}

-static int lo_send(struct loop_device *lo, struct bio *bio, loff_t pos)
+static int lo_send(struct loop_device *lo, struct request *rq, loff_t pos)
{
int (*do_lo_send)(struct loop_device *, struct bio_vec *, loff_t,
struct page *page);
struct bio_vec bvec;
- struct bvec_iter iter;
+ struct req_iterator iter;
struct page *page = NULL;
int ret = 0;

@@ -306,7 +306,7 @@ static int lo_send(struct loop_device *lo, struct bio *bio, loff_t pos)
do_lo_send = do_lo_send_direct_write;
}

- bio_for_each_segment(bvec, bio, iter) {
+ rq_for_each_segment(bvec, rq, iter) {
ret = do_lo_send(lo, &bvec, pos, page);
if (ret < 0)
break;
@@ -394,19 +394,22 @@ do_lo_receive(struct loop_device *lo,
}

static int
-lo_receive(struct loop_device *lo, struct bio *bio, int bsize, loff_t pos)
+lo_receive(struct loop_device *lo, struct request *rq, int bsize, loff_t pos)
{
struct bio_vec bvec;
- struct bvec_iter iter;
+ struct req_iterator iter;
ssize_t s;

- bio_for_each_segment(bvec, bio, iter) {
+ rq_for_each_segment(bvec, rq, iter) {
s = do_lo_receive(lo, &bvec, bsize, pos);
if (s < 0)
return s;

if (s != bvec.bv_len) {
- zero_fill_bio(bio);
+ struct bio *bio;
+
+ __rq_for_each_bio(bio, rq)
+ zero_fill_bio(bio);
break;
}
pos += bvec.bv_len;
@@ -414,17 +417,17 @@ lo_receive(struct loop_device *lo, struct bio *bio, int bsize, loff_t pos)
return 0;
}

-static int do_bio_filebacked(struct loop_device *lo, struct bio *bio)
+static int do_bio_filebacked(struct loop_device *lo, struct request *rq)
{
loff_t pos;
int ret;

- pos = ((loff_t) bio->bi_iter.bi_sector << 9) + lo->lo_offset;
+ pos = ((loff_t) blk_rq_pos(rq) << 9) + lo->lo_offset;

- if (bio_rw(bio) == WRITE) {
+ if (rq->cmd_flags & REQ_WRITE) {
struct file *file = lo->lo_backing_file;

- if (bio->bi_rw & REQ_FLUSH) {
+ if (rq->cmd_flags & REQ_FLUSH) {
ret = vfs_fsync(file, 0);
if (unlikely(ret && ret != -EINVAL)) {
ret = -EIO;
@@ -438,7 +441,7 @@ static int do_bio_filebacked(struct loop_device *lo, struct bio *bio)
* encryption is enabled, because it may give an attacker
* useful information.
*/
- if (bio->bi_rw & REQ_DISCARD) {
+ if (rq->cmd_flags & REQ_DISCARD) {
struct file *file = lo->lo_backing_file;
int mode = FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE;

@@ -448,22 +451,22 @@ static int do_bio_filebacked(struct loop_device *lo, struct bio *bio)
goto out;
}
ret = file->f_op->fallocate(file, mode, pos,
- bio->bi_iter.bi_size);
+ blk_rq_bytes(rq));
if (unlikely(ret && ret != -EINVAL &&
ret != -EOPNOTSUPP))
ret = -EIO;
goto out;
}

- ret = lo_send(lo, bio, pos);
+ ret = lo_send(lo, rq, pos);

- if ((bio->bi_rw & REQ_FUA) && !ret) {
+ if ((rq->cmd_flags & REQ_FUA) && !ret) {
ret = vfs_fsync(file, 0);
if (unlikely(ret && ret != -EINVAL))
ret = -EIO;
}
} else
- ret = lo_receive(lo, bio, lo->lo_blocksize, pos);
+ ret = lo_receive(lo, rq, lo->lo_blocksize, pos);

out:
return ret;
@@ -474,12 +477,6 @@ struct switch_request {
struct completion wait;
};

-static inline int loop_handle_bio(struct loop_device *lo, struct bio *bio)
-{
- int ret = do_bio_filebacked(lo, bio);
- return ret;
-}
-
/*
* Do the actual switch; called from the BIO completion routine
*/
@@ -1507,7 +1504,6 @@ static void loop_queue_work(struct work_struct *work)
const bool write = cmd->rq->cmd_flags & REQ_WRITE;
struct loop_device *lo = cmd->lo;
int ret = -EIO;
- struct bio *bio;

if (lo->lo_state != Lo_bound)
goto failed;
@@ -1515,9 +1511,7 @@ static void loop_queue_work(struct work_struct *work)
if (write && (lo->lo_flags & LO_FLAGS_READ_ONLY))
goto failed;

- ret = 0;
- __rq_for_each_bio(bio, cmd->rq)
- ret |= loop_handle_bio(lo, bio);
+ ret = do_bio_filebacked(lo, cmd->rq);

failed:
if (ret)
--
1.7.9.5

2014-07-23 22:58:21

by Ming Lei

[permalink] [raw]
Subject: [PATCH 5/9] block: loop: introduce lo_discard() and lo_req_flush()

No behaviour change, just move the handling for REQ_DISCARD
and REQ_FLUSH in these two functions.

Signed-off-by: Ming Lei <[email protected]>
---
drivers/block/loop.c | 73 +++++++++++++++++++++++++++-----------------------
1 file changed, 40 insertions(+), 33 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 0729c3d..96f945e 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -417,6 +417,40 @@ lo_receive(struct loop_device *lo, struct request *rq, int bsize, loff_t pos)
return 0;
}

+static int lo_discard(struct loop_device *lo, struct request *rq, loff_t pos)
+{
+ /*
+ * We use punch hole to reclaim the free space used by the
+ * image a.k.a. discard. However we do not support discard if
+ * encryption is enabled, because it may give an attacker
+ * useful information.
+ */
+ struct file *file = lo->lo_backing_file;
+ int mode = FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE;
+ int ret;
+
+ if ((!file->f_op->fallocate) || lo->lo_encrypt_key_size) {
+ ret = -EOPNOTSUPP;
+ goto out;
+ }
+
+ ret = file->f_op->fallocate(file, mode, pos, blk_rq_bytes(rq));
+ if (unlikely(ret && ret != -EINVAL && ret != -EOPNOTSUPP))
+ ret = -EIO;
+ out:
+ return ret;
+}
+
+static int lo_req_flush(struct loop_device *lo, struct request *rq)
+{
+ struct file *file = lo->lo_backing_file;
+ int ret = vfs_fsync(file, 0);
+ if (unlikely(ret && ret != -EINVAL))
+ ret = -EIO;
+
+ return ret;
+}
+
static int do_bio_filebacked(struct loop_device *lo, struct request *rq)
{
loff_t pos;
@@ -425,46 +459,19 @@ static int do_bio_filebacked(struct loop_device *lo, struct request *rq)
pos = ((loff_t) blk_rq_pos(rq) << 9) + lo->lo_offset;

if (rq->cmd_flags & REQ_WRITE) {
- struct file *file = lo->lo_backing_file;
-
- if (rq->cmd_flags & REQ_FLUSH) {
- ret = vfs_fsync(file, 0);
- if (unlikely(ret && ret != -EINVAL)) {
- ret = -EIO;
- goto out;
- }
- }

- /*
- * We use punch hole to reclaim the free space used by the
- * image a.k.a. discard. However we do not support discard if
- * encryption is enabled, because it may give an attacker
- * useful information.
- */
+ if (rq->cmd_flags & REQ_FLUSH)
+ ret = lo_req_flush(lo, rq);
+
if (rq->cmd_flags & REQ_DISCARD) {
- struct file *file = lo->lo_backing_file;
- int mode = FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE;
-
- if ((!file->f_op->fallocate) ||
- lo->lo_encrypt_key_size) {
- ret = -EOPNOTSUPP;
- goto out;
- }
- ret = file->f_op->fallocate(file, mode, pos,
- blk_rq_bytes(rq));
- if (unlikely(ret && ret != -EINVAL &&
- ret != -EOPNOTSUPP))
- ret = -EIO;
+ ret = lo_discard(lo, rq, pos);
goto out;
}

ret = lo_send(lo, rq, pos);

- if ((rq->cmd_flags & REQ_FUA) && !ret) {
- ret = vfs_fsync(file, 0);
- if (unlikely(ret && ret != -EINVAL))
- ret = -EIO;
- }
+ if ((rq->cmd_flags & REQ_FUA) && !ret)
+ ret = lo_req_flush(lo, rq);
} else
ret = lo_receive(lo, rq, lo->lo_blocksize, pos);

--
1.7.9.5

2014-07-23 22:58:36

by Ming Lei

[permalink] [raw]
Subject: [PATCH 6/9] block: loop: don't handle REQ_FUA explicitly

block core handles REQ_FUA by its flush state machine, so
won't do it in loop explicitly.

Signed-off-by: Ming Lei <[email protected]>
---
drivers/block/loop.c | 14 +++-----------
1 file changed, 3 insertions(+), 11 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 96f945e..96a8913 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -459,23 +459,15 @@ static int do_bio_filebacked(struct loop_device *lo, struct request *rq)
pos = ((loff_t) blk_rq_pos(rq) << 9) + lo->lo_offset;

if (rq->cmd_flags & REQ_WRITE) {
-
if (rq->cmd_flags & REQ_FLUSH)
ret = lo_req_flush(lo, rq);
-
- if (rq->cmd_flags & REQ_DISCARD) {
+ else if (rq->cmd_flags & REQ_DISCARD)
ret = lo_discard(lo, rq, pos);
- goto out;
- }
-
- ret = lo_send(lo, rq, pos);
-
- if ((rq->cmd_flags & REQ_FUA) && !ret)
- ret = lo_req_flush(lo, rq);
+ else
+ ret = lo_send(lo, rq, pos);
} else
ret = lo_receive(lo, rq, lo->lo_blocksize, pos);

-out:
return ret;
}

--
1.7.9.5

2014-07-23 22:59:06

by Ming Lei

[permalink] [raw]
Subject: [PATCH 7/9] aio: add aio_kernel_() interface

From: Dave Kleikamp <[email protected]>

This adds an interface that lets kernel callers submit aio iocbs without
going through the user space syscalls. This lets kernel callers avoid
the management limits and overhead of the context. It will also let us
integrate aio operations with other kernel apis that the user space
interface doesn't have access to.

This patch is based on Dave's posts in below links:

https://lkml.org/lkml/2013/10/16/365
https://groups.google.com/forum/#!topic/linux.kernel/l7mogGJZoKQ

And most of the patch is from Dave's directly.

Cc: Zach Brown <[email protected]>
Cc: Dave Kleikamp <[email protected]>
Cc: Benjamin LaHaise <[email protected]>
Cc: Alexander Viro <[email protected]>
Cc: [email protected]
Cc: [email protected] (open list:AIO)
Signed-off-by: Ming Lei <[email protected]>
---
fs/aio.c | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++
include/linux/aio.h | 21 +++++++++-
2 files changed, 134 insertions(+), 1 deletion(-)

diff --git a/fs/aio.c b/fs/aio.c
index d93bfa6..7a081b7 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -948,6 +948,9 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
iocb->ki_ctx = ERR_PTR(-EXDEV);
wake_up_process(iocb->ki_obj.tsk);
return;
+ } else if (is_kernel_kiocb(iocb)) {
+ iocb->ki_obj.complete(iocb->ki_user_data, res);
+ return;
}

if (iocb->ki_list.next) {
@@ -1395,6 +1398,117 @@ rw_common:
return 0;
}

+/*
+ * This allocates an iocb that will be used to submit and track completion of
+ * an IO that is issued from kernel space.
+ *
+ * The caller is expected to call the appropriate aio_kernel_init_() functions
+ * and then call aio_kernel_submit(). From that point forward progress is
+ * guaranteed by the file system aio method. Eventually the caller's
+ * completion callback will be called.
+ *
+ * These iocbs are special. They don't have a context, we don't limit the
+ * number pending, and they can't be canceled.
+ */
+struct kiocb *aio_kernel_alloc(gfp_t gfp, unsigned extra)
+{
+ return kzalloc(sizeof(struct kiocb) + extra, gfp);
+}
+EXPORT_SYMBOL_GPL(aio_kernel_alloc);
+
+void aio_kernel_free(struct kiocb *iocb)
+{
+ kfree(iocb);
+}
+EXPORT_SYMBOL_GPL(aio_kernel_free);
+
+/*
+ * ptr and count can be a buff and bytes or an iov and segs.
+ */
+void aio_kernel_init_rw(struct kiocb *iocb, struct file *filp,
+ size_t nr, loff_t off,
+ void (*complete)(u64 user_data, long res),
+ u64 user_data)
+{
+ iocb->ki_filp = filp;
+ iocb->ki_nbytes = nr;
+ iocb->ki_pos = off;
+ iocb->ki_ctx = (void *)-1;
+
+ iocb->ki_obj.complete = complete;
+ iocb->ki_user_data = user_data;
+}
+EXPORT_SYMBOL_GPL(aio_kernel_init_rw);
+
+static ssize_t aio_read_iter(struct kiocb *iocb, struct iov_iter *iter)
+{
+ struct file *file = iocb->ki_filp;
+ ssize_t ret;
+
+ if (unlikely(!(file->f_mode & FMODE_READ)))
+ return -EBADF;
+
+ ret = security_file_permission(file, MAY_READ);
+ if (unlikely(ret))
+ return ret;
+
+ if (!file->f_op->read_iter)
+ return -EINVAL;
+
+ return file->f_op->read_iter(iocb, iter);
+}
+
+static ssize_t aio_write_iter(struct kiocb *iocb, struct iov_iter *iter)
+{
+ struct file *file = iocb->ki_filp;
+ ssize_t ret;
+
+ if (unlikely(!(file->f_mode & FMODE_WRITE)))
+ return -EBADF;
+
+ ret = security_file_permission(file, MAY_WRITE);
+ if (unlikely(ret))
+ return ret;
+
+ if (!file->f_op->write_iter)
+ return -EINVAL;
+
+ file_start_write(file);
+ ret = file->f_op->write_iter(iocb, iter);
+ file_end_write(file);
+
+ return ret;
+}
+
+/*
+ * The iocb is our responsibility once this is called. The caller must not
+ * reference it.
+ *
+ * Callers must be prepared for their iocb completion callback to be called the
+ * moment they enter this function. The completion callback may be called from
+ * any context.
+ *
+ * Returns: 0: the iocb completion callback will be called with the op result
+ * negative errno: the operation was not submitted and the iocb was freed
+ */
+int aio_kernel_submit(struct kiocb *iocb, unsigned op,
+ struct iov_iter *iter)
+{
+ int ret = -EINVAL;
+
+ if (WARN_ON(!is_kernel_kiocb(iocb) || !iocb->ki_obj.complete
+ || !iocb->ki_filp || !(iter->type & ITER_BVEC)))
+ return ret;
+
+ if (op == IOCB_CMD_READ_ITER)
+ ret = aio_read_iter(iocb, iter);
+ else if (op == IOCB_CMD_WRITE_ITER)
+ ret = aio_write_iter(iocb, iter);
+
+ return ret;
+}
+EXPORT_SYMBOL_GPL(aio_kernel_submit);
+
static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
struct iocb *iocb, bool compat)
{
diff --git a/include/linux/aio.h b/include/linux/aio.h
index d9c92da..c68504d 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -14,6 +14,12 @@ struct kiocb;

#define KIOCB_KEY 0

+/* opcode values not exposed to user space */
+enum {
+ IOCB_CMD_READ_ITER = 0x10000,
+ IOCB_CMD_WRITE_ITER = 0x10001,
+};
+
/*
* We use ki_cancel == KIOCB_CANCELLED to indicate that a kiocb has been either
* cancelled or completed (this makes a certain amount of sense because
@@ -31,13 +37,15 @@ typedef int (kiocb_cancel_fn)(struct kiocb *);

struct kiocb {
struct file *ki_filp;
- struct kioctx *ki_ctx; /* NULL for sync ops */
+ struct kioctx *ki_ctx; /* NULL for sync ops,
+ * -1 for kernel caller */
kiocb_cancel_fn *ki_cancel;
void *private;

union {
void __user *user;
struct task_struct *tsk;
+ void (*complete)(u64 user_data, long res);
} ki_obj;

__u64 ki_user_data; /* user's data for completion */
@@ -59,6 +67,11 @@ static inline bool is_sync_kiocb(struct kiocb *kiocb)
return kiocb->ki_ctx == NULL;
}

+static inline bool is_kernel_kiocb(struct kiocb *kiocb)
+{
+ return kiocb->ki_ctx == (void *)-1;
+}
+
static inline void init_sync_kiocb(struct kiocb *kiocb, struct file *filp)
{
*kiocb = (struct kiocb) {
@@ -77,6 +90,12 @@ extern void exit_aio(struct mm_struct *mm);
extern long do_io_submit(aio_context_t ctx_id, long nr,
struct iocb __user *__user *iocbpp, bool compat);
void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel);
+struct kiocb *aio_kernel_alloc(gfp_t gfp, unsigned extra);
+void aio_kernel_free(struct kiocb *iocb);
+void aio_kernel_init_rw(struct kiocb *iocb, struct file *filp, size_t nr,
+ loff_t off, void (*complete)(u64 user_data, long res),
+ u64 user_data);
+int aio_kernel_submit(struct kiocb *iocb, unsigned op, struct iov_iter *iter);
#else
static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; }
static inline void aio_complete(struct kiocb *iocb, long res, long res2) { }
--
1.7.9.5

2014-07-23 22:59:24

by Ming Lei

[permalink] [raw]
Subject: [PATCH 8/9] fd/direct-io: introduce should_dirty for kernel aio

For pages from kernel AIO, it is required to dirty
them before direct IO.

The idea is borrowd from Dave previous post.

Cc: Zach Brown <[email protected]>
Cc: Dave Kleikamp <[email protected]>
Cc: Benjamin LaHaise <[email protected]>
Cc: Alexander Viro <[email protected]>
Cc: [email protected]
Signed-off-by: Ming Lei <[email protected]>
---
fs/direct-io.c | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/fs/direct-io.c b/fs/direct-io.c
index 3997023..bc3212ee 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -122,6 +122,7 @@ struct dio {
int page_errors; /* errno from get_user_pages() */
int is_async; /* is IO async ? */
bool defer_completion; /* defer AIO completion to workqueue? */
+ bool should_dirty; /* if page should be dirty */
int io_error; /* IO error in completion path */
unsigned long refcount; /* direct_io_worker() and bios */
struct bio *bio_list; /* singly linked via bi_private */
@@ -397,7 +398,7 @@ static inline void dio_bio_submit(struct dio *dio, struct dio_submit *sdio)
dio->refcount++;
spin_unlock_irqrestore(&dio->bio_lock, flags);

- if (dio->is_async && dio->rw == READ)
+ if (dio->is_async && dio->rw == READ && dio->should_dirty)
bio_set_pages_dirty(bio);

if (sdio->submit_io)
@@ -468,13 +469,14 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio)
if (!uptodate)
dio->io_error = -EIO;

- if (dio->is_async && dio->rw == READ) {
+ if (dio->is_async && dio->rw == READ && dio->should_dirty) {
bio_check_pages_dirty(bio); /* transfers ownership */
} else {
bio_for_each_segment_all(bvec, bio, i) {
struct page *page = bvec->bv_page;

- if (dio->rw == READ && !PageCompound(page))
+ if (dio->rw == READ && !PageCompound(page) &&
+ dio->should_dirty)
set_page_dirty_lock(page);
page_cache_release(page);
}
@@ -1217,6 +1219,7 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,

spin_lock_init(&dio->bio_lock);
dio->refcount = 1;
+ dio->should_dirty = !is_kernel_kiocb(iocb);

sdio.iter = iter;
sdio.final_block_in_request =
--
1.7.9.5

2014-07-23 22:59:44

by Ming Lei

[permalink] [raw]
Subject: [PATCH 9/9] block: loop: support to submit I/O via kernel aio based

Part of the patch is based on Dave's previous post.

It is easy to observe that loop block device thoughput
can be increased by > 100% in single job randread,
libaio engine, direct I/O fio test.

Cc: Zach Brown <[email protected]>
Cc: Dave Kleikamp <[email protected]>
Cc: Benjamin LaHaise <[email protected]>
Signed-off-by: Ming Lei <[email protected]>
---
drivers/block/loop.c | 128 ++++++++++++++++++++++++++++++++++++++++++---
drivers/block/loop.h | 1 +
include/uapi/linux/loop.h | 1 +
3 files changed, 122 insertions(+), 8 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 96a8913..2279d26 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -76,6 +76,7 @@
#include <linux/miscdevice.h>
#include <linux/falloc.h>
#include <linux/blk-mq.h>
+#include <linux/aio.h>
#include "loop.h"

#include <asm/uaccess.h>
@@ -451,22 +452,112 @@ static int lo_req_flush(struct loop_device *lo, struct request *rq)
return ret;
}

-static int do_bio_filebacked(struct loop_device *lo, struct request *rq)
+#ifdef CONFIG_AIO
+static void lo_rw_aio_complete(u64 data, long res)
+{
+ struct loop_cmd *cmd = (struct loop_cmd *)(uintptr_t)data;
+ struct request *rq = cmd->rq;
+
+ if (res > 0)
+ res = 0;
+ else if (res < 0)
+ res = -EIO;
+
+ rq->errors = res;
+ aio_kernel_free(cmd->iocb);
+ blk_mq_complete_request(rq);
+}
+
+static int lo_rw_aio(struct loop_device *lo, struct loop_cmd *cmd,
+ bool write, loff_t pos)
+{
+ struct file *file = lo->lo_backing_file;
+ struct request *rq = cmd->rq;
+ struct kiocb *iocb;
+ unsigned int op, i = 0;
+ struct iov_iter iter;
+ struct bio_vec *bvec, bv;
+ size_t nr_segs = 0;
+ struct req_iterator r_iter;
+ int ret = -EIO;
+
+ /* how many segments */
+ rq_for_each_segment(bv, rq, r_iter)
+ nr_segs++;
+
+ iocb = aio_kernel_alloc(GFP_NOIO, nr_segs * sizeof(*bvec));
+ if (!iocb) {
+ ret = -ENOMEM;
+ goto failed;
+ }
+
+ cmd->iocb = iocb;
+ bvec = (struct bio_vec *)(iocb + 1);
+ rq_for_each_segment(bv, rq, r_iter)
+ bvec[i++] = bv;
+
+ if (write)
+ op = IOCB_CMD_WRITE_ITER;
+ else
+ op = IOCB_CMD_READ_ITER;
+
+ iter.type = ITER_BVEC | (write ? WRITE : 0);
+ iter.bvec = bvec;
+ iter.nr_segs = nr_segs;
+ iter.count = blk_rq_bytes(rq);
+ iter.iov_offset = 0;
+
+ aio_kernel_init_rw(iocb, file, iov_iter_count(&iter), pos,
+ lo_rw_aio_complete, (u64)(uintptr_t)cmd);
+ ret = aio_kernel_submit(iocb, op, &iter);
+
+ /*
+ * use same policy with userspace aio, req may have been
+ * completed already, so relase it by aio completion.
+ */
+ if (ret != -EIOCBQUEUED)
+ lo_rw_aio_complete((u64)cmd, ret);
+ return 0;
+ failed:
+ return ret;
+}
+#endif /* CONFIG_AIO */
+
+static int lo_io_rw(struct loop_device *lo, struct loop_cmd *cmd,
+ bool write, loff_t pos)
+{
+#ifdef CONFIG_AIO
+ if (lo->lo_flags & LO_FLAGS_USE_AIO)
+ return lo_rw_aio(lo, cmd, write, pos);
+#endif
+ if (write)
+ return lo_send(lo, cmd->rq, pos);
+ else
+ return lo_receive(lo, cmd->rq, lo->lo_blocksize, pos);
+}
+
+static int do_bio_filebacked(struct loop_device *lo,
+ struct loop_cmd *cmd, bool *sync)
{
loff_t pos;
int ret;
+ struct request *rq = cmd->rq;

+ *sync = false;
pos = ((loff_t) blk_rq_pos(rq) << 9) + lo->lo_offset;

if (rq->cmd_flags & REQ_WRITE) {
- if (rq->cmd_flags & REQ_FLUSH)
+ if (rq->cmd_flags & REQ_FLUSH) {
ret = lo_req_flush(lo, rq);
- else if (rq->cmd_flags & REQ_DISCARD)
+ *sync = true;
+ } else if (rq->cmd_flags & REQ_DISCARD) {
ret = lo_discard(lo, rq, pos);
- else
- ret = lo_send(lo, rq, pos);
+ *sync = true;
+ } else {
+ ret = lo_io_rw(lo, cmd, true, pos);
+ }
} else
- ret = lo_receive(lo, rq, lo->lo_blocksize, pos);
+ ret = lo_io_rw(lo, cmd, false, pos);

return ret;
}
@@ -771,6 +862,14 @@ static int loop_set_fd(struct loop_device *lo, fmode_t mode,
!file->f_op->write)
lo_flags |= LO_FLAGS_READ_ONLY;

+#ifdef CONFIG_AIO
+ if (file->f_op->write_iter && file->f_op->read_iter &&
+ mapping->a_ops->direct_IO) {
+ file->f_flags |= O_DIRECT;
+ lo_flags |= LO_FLAGS_USE_AIO;
+ }
+#endif
+
lo_blocksize = S_ISBLK(inode->i_mode) ?
inode->i_bdev->bd_block_size : PAGE_SIZE;

@@ -804,6 +903,17 @@ static int loop_set_fd(struct loop_device *lo, fmode_t mode,

set_blocksize(bdev, lo_blocksize);

+#ifdef CONFIG_AIO
+ /*
+ * We must not send too-small direct-io requests, so we reflect
+ * the minimum io size to the loop device's logical block size
+ */
+ if ((lo_flags & LO_FLAGS_USE_AIO) && inode->i_sb->s_bdev)
+ blk_queue_logical_block_size(lo->lo_queue,
+ bdev_io_min(inode->i_sb->s_bdev));
+#endif
+
+
lo->lo_state = Lo_bound;
if (part_shift)
lo->lo_flags |= LO_FLAGS_PARTSCAN;
@@ -1503,6 +1613,7 @@ static void loop_queue_work(struct work_struct *work)
const bool write = cmd->rq->cmd_flags & REQ_WRITE;
struct loop_device *lo = cmd->lo;
int ret = -EIO;
+ bool sync = true;

if (lo->lo_state != Lo_bound)
goto failed;
@@ -1510,12 +1621,13 @@ static void loop_queue_work(struct work_struct *work)
if (write && (lo->lo_flags & LO_FLAGS_READ_ONLY))
goto failed;

- ret = do_bio_filebacked(lo, cmd->rq);
+ ret = do_bio_filebacked(lo, cmd, &sync);

failed:
if (ret)
cmd->rq->errors = -EIO;
- blk_mq_complete_request(cmd->rq);
+ if (!(lo->lo_flags & LO_FLAGS_USE_AIO) || sync || ret)
+ blk_mq_complete_request(cmd->rq);
}

static int loop_init_request(void *data, struct request *rq,
diff --git a/drivers/block/loop.h b/drivers/block/loop.h
index be796c7..4004af5 100644
--- a/drivers/block/loop.h
+++ b/drivers/block/loop.h
@@ -65,6 +65,7 @@ struct loop_cmd {
struct work_struct work;
struct request *rq;
struct loop_device *lo;
+ struct kiocb *iocb;
};

/* Support for loadable transfer modules */
diff --git a/include/uapi/linux/loop.h b/include/uapi/linux/loop.h
index e0cecd2..6edc6b6 100644
--- a/include/uapi/linux/loop.h
+++ b/include/uapi/linux/loop.h
@@ -21,6 +21,7 @@ enum {
LO_FLAGS_READ_ONLY = 1,
LO_FLAGS_AUTOCLEAR = 4,
LO_FLAGS_PARTSCAN = 8,
+ LO_FLAGS_USE_AIO = 16,
};

#include <asm/posix_types.h> /* for __kernel_old_dev_t */
--
1.7.9.5

2014-07-23 23:16:05

by Zach Brown

[permalink] [raw]
Subject: Re: [PATCH 7/9] aio: add aio_kernel_() interface

On Thu, Jul 24, 2014 at 06:55:28AM +0800, Ming Lei wrote:
> From: Dave Kleikamp <[email protected]>
>
> This adds an interface that lets kernel callers submit aio iocbs without
> going through the user space syscalls. This lets kernel callers avoid
> the management limits and overhead of the context. It will also let us
> integrate aio operations with other kernel apis that the user space
> interface doesn't have access to.
>
> This patch is based on Dave's posts in below links:
>
> https://lkml.org/lkml/2013/10/16/365
> https://groups.google.com/forum/#!topic/linux.kernel/l7mogGJZoKQ

This was originally written a billion years ago when dinosaurs roamed
the earth. Also, notably, before Kent and Ben reworked a bunch of the
aio core. I'd want them to take a look at this patch to make sure that
it doesn't rely on any assumptions that have changed.

> +/* opcode values not exposed to user space */
> +enum {
> + IOCB_CMD_READ_ITER = 0x10000,
> + IOCB_CMD_WRITE_ITER = 0x10001,
> +};

And I think the consensus was that this isn't good enough. Find a way
to encode the kernel caller ops without polluting the uiocb cmd name
space.

(I've now come to think that this entire approach of having loop use aio
is misguided and that the way forward is to have dio consume what loop
naturally produces -- bios, blk-mq rqs, whatever -- but I'm on to other
things these days.)

- z

2014-07-24 01:57:21

by Ming Lei

[permalink] [raw]
Subject: Re: [PATCH 7/9] aio: add aio_kernel_() interface

On Thu, Jul 24, 2014 at 7:16 AM, Zach Brown <[email protected]> wrote:
> On Thu, Jul 24, 2014 at 06:55:28AM +0800, Ming Lei wrote:
>> From: Dave Kleikamp <[email protected]>
>>
>> This adds an interface that lets kernel callers submit aio iocbs without
>> going through the user space syscalls. This lets kernel callers avoid
>> the management limits and overhead of the context. It will also let us
>> integrate aio operations with other kernel apis that the user space
>> interface doesn't have access to.
>>
>> This patch is based on Dave's posts in below links:
>>
>> https://lkml.org/lkml/2013/10/16/365
>> https://groups.google.com/forum/#!topic/linux.kernel/l7mogGJZoKQ
>
> This was originally written a billion years ago when dinosaurs roamed
> the earth. Also, notably, before Kent and Ben reworked a bunch of the

Not so far away, this patch is based on Dave's last version of V9, which
was posted in Oct, 2013, :-)

> aio core. I'd want them to take a look at this patch to make sure that
> it doesn't rely on any assumptions that have changed.

Looks I missed to Cc Ken, :-(

>
>> +/* opcode values not exposed to user space */
>> +enum {
>> + IOCB_CMD_READ_ITER = 0x10000,
>> + IOCB_CMD_WRITE_ITER = 0x10001,
>> +};
>
> And I think the consensus was that this isn't good enough. Find a way
> to encode the kernel caller ops without polluting the uiocb cmd name
> space.

That is easy, since the two cmd names are only for kernel AIO, whatever
should be OK, but looks I didn't see such comment.

>
> (I've now come to think that this entire approach of having loop use aio
> is misguided and that the way forward is to have dio consume what loop
> naturally produces -- bios, blk-mq rqs, whatever -- but I'm on to other

Yes, that is what these patches are doing, and actually AIO's
model is a good match to driver's interface. Lots of drivers
use the asynchronous model(submit, complete, ...).

> things these days.)

At least, loop can improve its throughput much by kernel AIO
without big changes to fs/direct-io(attribute much to ITER_BVEC),
and vhost-scsi should benefit from it too.

Thanks,

2014-07-24 03:08:52

by Ming Lei

[permalink] [raw]
Subject: Re: [PATCH 9/9] block: loop: support to submit I/O via kernel aio based

On Thu, Jul 24, 2014 at 6:55 AM, Ming Lei <[email protected]> wrote:
> Part of the patch is based on Dave's previous post.
>
> It is easy to observe that loop block device thoughput
> can be increased by > 100% in single job randread,
> libaio engine, direct I/O fio test.
>
> Cc: Zach Brown <[email protected]>
> Cc: Dave Kleikamp <[email protected]>
> Cc: Benjamin LaHaise <[email protected]>
> Signed-off-by: Ming Lei <[email protected]>
> ---
> drivers/block/loop.c | 128 ++++++++++++++++++++++++++++++++++++++++++---
> drivers/block/loop.h | 1 +
> include/uapi/linux/loop.h | 1 +
> 3 files changed, 122 insertions(+), 8 deletions(-)
>
> diff --git a/drivers/block/loop.c b/drivers/block/loop.c
> index 96a8913..2279d26 100644
> --- a/drivers/block/loop.c
> +++ b/drivers/block/loop.c

> +static int lo_rw_aio(struct loop_device *lo, struct loop_cmd *cmd,
> + bool write, loff_t pos)
> +{
> + struct file *file = lo->lo_backing_file;
> + struct request *rq = cmd->rq;
> + struct kiocb *iocb;
> + unsigned int op, i = 0;
> + struct iov_iter iter;
> + struct bio_vec *bvec, bv;
> + size_t nr_segs = 0;
> + struct req_iterator r_iter;
> + int ret = -EIO;
> +
> + /* how many segments */
> + rq_for_each_segment(bv, rq, r_iter)
> + nr_segs++;
> +
> + iocb = aio_kernel_alloc(GFP_NOIO, nr_segs * sizeof(*bvec));
> + if (!iocb) {
> + ret = -ENOMEM;
> + goto failed;
> + }
> +
> + cmd->iocb = iocb;
> + bvec = (struct bio_vec *)(iocb + 1);
> + rq_for_each_segment(bv, rq, r_iter)
> + bvec[i++] = bv;
> +
> + if (write)
> + op = IOCB_CMD_WRITE_ITER;
> + else
> + op = IOCB_CMD_READ_ITER;
> +
> + iter.type = ITER_BVEC | (write ? WRITE : 0);
> + iter.bvec = bvec;
> + iter.nr_segs = nr_segs;
> + iter.count = blk_rq_bytes(rq);
> + iter.iov_offset = 0;
> +
> + aio_kernel_init_rw(iocb, file, iov_iter_count(&iter), pos,
> + lo_rw_aio_complete, (u64)(uintptr_t)cmd);
> + ret = aio_kernel_submit(iocb, op, &iter);
> +
> + /*
> + * use same policy with userspace aio, req may have been
> + * completed already, so relase it by aio completion.
> + */
> + if (ret != -EIOCBQUEUED)
> + lo_rw_aio_complete((u64)cmd, ret);

The above stuff should have been put inside aio_kernel_submit(),
will do it in V1.

Thanks,

2014-07-25 01:25:50

by Ming Lei

[permalink] [raw]
Subject: Re: [PATCH 7/9] aio: add aio_kernel_() interface

On Thu, Jul 24, 2014 at 11:16 PM, Dave Kleikamp
<[email protected]> wrote:
> On 07/23/2014 08:57 PM, Ming Lei wrote:
>> On Thu, Jul 24, 2014 at 7:16 AM, Zach Brown <[email protected]> wrote:
>>> On Thu, Jul 24, 2014 at 06:55:28AM +0800, Ming Lei wrote:
>>>> From: Dave Kleikamp <[email protected]>
>>>>
>>>> This adds an interface that lets kernel callers submit aio iocbs without
>>>> going through the user space syscalls. This lets kernel callers avoid
>>>> the management limits and overhead of the context. It will also let us
>>>> integrate aio operations with other kernel apis that the user space
>>>> interface doesn't have access to.
>>>>
>>>> This patch is based on Dave's posts in below links:
>>>>
>>>> https://lkml.org/lkml/2013/10/16/365
>>>> https://groups.google.com/forum/#!topic/linux.kernel/l7mogGJZoKQ
>>>
>>> This was originally written a billion years ago when dinosaurs roamed
>>> the earth. Also, notably, before Kent and Ben reworked a bunch of the
>>
>> Not so far away, this patch is based on Dave's last version of V9, which
>> was posted in Oct, 2013, :-)
>
> Which was based on a much earlier patch from Zach. I regret that I left
> aio_kernel_submit entangled with aio_run_iocb when I reworked his patches.
>
>>> aio core. I'd want them to take a look at this patch to make sure that
>>> it doesn't rely on any assumptions that have changed.
>>
>> Looks I missed to Cc Ken, :-(
>>
>>>
>>>> +/* opcode values not exposed to user space */
>>>> +enum {
>>>> + IOCB_CMD_READ_ITER = 0x10000,
>>>> + IOCB_CMD_WRITE_ITER = 0x10001,
>>>> +};
>>>
>>> And I think the consensus was that this isn't good enough. Find a way
>>> to encode the kernel caller ops without polluting the uiocb cmd name
>>> space.
>>
>> That is easy, since the two cmd names are only for kernel AIO, whatever
>> should be OK, but looks I didn't see such comment.
>
> Agreed. These were added because the flags had been interpreted by
> aio_run_iocb(). I'm happy that is no longer the case.

We can remove the two cmd names completely, and just use one
read/write flag, will do it in V1.

>
>>>
>>> (I've now come to think that this entire approach of having loop use aio
>>> is misguided and that the way forward is to have dio consume what loop
>>> naturally produces -- bios, blk-mq rqs, whatever -- but I'm on to other
>>
>> Yes, that is what these patches are doing, and actually AIO's
>> model is a good match to driver's interface. Lots of drivers
>> use the asynchronous model(submit, complete, ...).
>>
>>> things these days.)
>>
>> At least, loop can improve its throughput much by kernel AIO
>> without big changes to fs/direct-io(attribute much to ITER_BVEC),
>> and vhost-scsi should benefit from it too.
>>
>> Thanks,
>>

2014-07-28 03:42:10

by Ming Lei

[permalink] [raw]
Subject: Re: [PATCH 9/9] block: loop: support to submit I/O via kernel aio based

Hi David,

Thanks for your interest in the patch.

On Sun, Jul 27, 2014 at 10:20 PM, David Horner <[email protected]> wrote:
> You mention > 100 % improvement.
>
> Can you help me to run some (micro) benchmarks that I could contribute to
> V1?

Sure, no problem, generally I just run tests over the loop block device
directly, since I want to expose performance improvement. But I did
run test bench(mkfs, copy big file and compare, ...) over filesystem
mounted on the loop device too, and looks it is stable enough.

So your benchmarks are either on loop block or filesystem, both
are fine.

> If you can direct me to relevant threads and docs that would be greatly
> appreciated.

Follows my test which was observed with 100% improvement(
throughout is doubled, actually it is more than doubled):

- losetup -f /dev/nullb0 #so /dev/loop0 will be backed with /dev/nullb0
#if it is the 1st loop device

- run below fio script[1] in single job to test direct randread on /dev/loop0

- it is just for single job improvement, with more than 1 jobs, the
throughout should be increased more with blk-mq conversion

I will send out V1 this week, and more test cases may be posted
too.


[1], fio test script
[global]
direct=1
size=32G
bsrange=4k-4k
timeout=20
numjobs=1
ioengine=libaio
iodepth=64
filename=/dev/loop0
group_reporting=1

[f]
rw=randread


Thanks,