2007-12-17 15:04:23

by Evgeniy Polyakov

[permalink] [raw]
Subject: [0/4] DST: Distributed storage.


Distributed storage.

I'm pleased to announce the 12'th release of the distributed
storage subsystem (DST).

DST allows to form a storage on top of local and remote nodes
and combine them into linear or mirroring setup, which in
turn can be exported to remote nodes.

Short changelog:
* new improved mirroring algorithm.
This algorithm uses sliding window approach for full resync
and write log for partial resync.
* fixed number of typos and debug cleanups
* update inode size when linear algorithm changes the size of the
storage in run time
* extended number of sysfs files and documentation for them
* fixed leak in local export node setup
* name is 'Dancing with the smoked neutrino' now

Overall list of features of the DST can be found on project's homepage:

http://tservice.net.ru/~s0mbre/old/?section=projects&item=dst

DST is also exported as a git tree available for clone and pull from
http://tservice.net.ru/~s0mbre/archive/dst/dst.git

Interested reader can test DST with 2.6.23 tree too
(it should compile fine, but was not tested).

DST passed all FS tests in LTP with XFS (modulo MAX_LOCK_DEPTH too low bug:
[ 8398.605691] BUG: MAX_LOCK_DEPTH too low!
[ 8398.609641] turning off the locking correctness validator.

this is not DST problem though), but it was not performed with
offline/online nodes.

Thank you.

Signed-off-by: Evgeniy Polyakov <[email protected]>


2007-12-17 15:04:49

by Evgeniy Polyakov

[permalink] [raw]
Subject: [2/4] DST: Core distributed storage files.


Core distributed storage files.
Include userspace interfaces, initialization,
block layer bindings and other core functionality.

Signed-off-by: Evgeniy Polyakov <[email protected]>


diff --git a/drivers/block/Kconfig b/drivers/block/Kconfig
index b4c8319..ca6592d 100644
--- a/drivers/block/Kconfig
+++ b/drivers/block/Kconfig
@@ -451,6 +451,8 @@ config ATA_OVER_ETH
This driver provides Support for ATA over Ethernet block
devices like the Coraid EtherDrive (R) Storage Blade.

+source "drivers/block/dst/Kconfig"
+
source "drivers/s390/block/Kconfig"

endmenu
diff --git a/drivers/block/Makefile b/drivers/block/Makefile
index dd88e33..fcf042d 100644
--- a/drivers/block/Makefile
+++ b/drivers/block/Makefile
@@ -29,3 +29,4 @@ obj-$(CONFIG_VIODASD) += viodasd.o
obj-$(CONFIG_BLK_DEV_SX8) += sx8.o
obj-$(CONFIG_BLK_DEV_UB) += ub.o

+obj-$(CONFIG_DST) += dst/
diff --git a/drivers/block/dst/Kconfig b/drivers/block/dst/Kconfig
new file mode 100644
index 0000000..67a7dad
--- /dev/null
+++ b/drivers/block/dst/Kconfig
@@ -0,0 +1,28 @@
+config DST
+ tristate "Distributed storage"
+ depends on NET
+ select CONNECTOR
+ select LIBCRC32C
+ ---help---
+ This driver allows to create a distributed storage.
+
+config DST_DEBUG
+ bool "DST debug"
+ depends on DST
+ ---help---
+ This option will turn HEAVY debugging of the DST.
+ Turn it on ONLY if you have to debug some really obscure problem.
+
+config DST_ALG_LINEAR
+ tristate "Linear distribution algorithm"
+ depends on DST
+ ---help---
+ This module allows to create linear mapping of the nodes
+ in the distributed storage.
+
+config DST_ALG_MIRROR
+ tristate "Mirror distribution algorithm"
+ depends on DST
+ ---help---
+ This module allows to create a mirror of the nodes in the
+ distributed storage.
diff --git a/drivers/block/dst/Makefile b/drivers/block/dst/Makefile
new file mode 100644
index 0000000..1400e94
--- /dev/null
+++ b/drivers/block/dst/Makefile
@@ -0,0 +1,6 @@
+obj-$(CONFIG_DST) += dst.o
+
+dst-y := dcore.o kst.o
+
+obj-$(CONFIG_DST_ALG_LINEAR) += alg_linear.o
+obj-$(CONFIG_DST_ALG_MIRROR) += alg_mirror.o
diff --git a/drivers/block/dst/dcore.c b/drivers/block/dst/dcore.c
new file mode 100644
index 0000000..423e7b2
--- /dev/null
+++ b/drivers/block/dst/dcore.c
@@ -0,0 +1,1622 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/blkdev.h>
+#include <linux/bio.h>
+#include <linux/slab.h>
+#include <linux/connector.h>
+#include <linux/socket.h>
+#include <linux/dst.h>
+#include <linux/device.h>
+#include <linux/in.h>
+#include <linux/in6.h>
+#include <linux/buffer_head.h>
+
+#include <net/sock.h>
+
+static LIST_HEAD(dst_storage_list);
+static LIST_HEAD(dst_alg_list);
+static DEFINE_MUTEX(dst_storage_lock);
+static DEFINE_MUTEX(dst_alg_lock);
+static int dst_major;
+static struct kst_worker *kst_main_worker;
+static struct cb_id cn_dst_id = { CN_DST_IDX, CN_DST_VAL };
+
+struct kmem_cache *dst_request_cache;
+
+static char dst_name[] = "Dancing with the smoked neutrino";
+
+/*
+ * DST sysfs tree. For device called 'storage' which is formed
+ * on top of two nodes this looks like this:
+ *
+ * /sys/devices/storage/
+ * /sys/devices/storage/alg : alg_linear
+ * /sys/devices/storage/n-800/type : R: 192.168.4.80:1025
+ * /sys/devices/storage/n-800/size : 800
+ * /sys/devices/storage/n-800/start : 800
+ * /sys/devices/storage/n-800/clean
+ * /sys/devices/storage/n-800/dirty
+ * /sys/devices/storage/n-0/type : R: 192.168.4.81:1025
+ * /sys/devices/storage/n-0/size : 800
+ * /sys/devices/storage/n-0/start : 0
+ * /sys/devices/storage/n-0/clean
+ * /sys/devices/storage/n-0/dirty
+ * /sys/devices/storage/remove_all_nodes
+ * /sys/devices/storage/nodes : sectors (start [size]): 0 [800] | 800 [800]
+ * /sys/devices/storage/name : storage
+ */
+
+static int dst_dev_match(struct device *dev, struct device_driver *drv)
+{
+ return 1;
+}
+
+static void dst_dev_release(struct device *dev)
+{
+}
+
+static struct bus_type dst_dev_bus_type = {
+ .name = "dst",
+ .match = &dst_dev_match,
+};
+
+static struct device dst_dev = {
+ .bus = &dst_dev_bus_type,
+ .release = &dst_dev_release
+};
+
+static void dst_node_release(struct device *dev)
+{
+}
+
+static struct device dst_node_dev = {
+ .release = &dst_node_release
+};
+
+static void dst_free_alg(struct dst_alg *alg)
+{
+ kfree(alg);
+}
+
+/*
+ * Algorithm is never freed directly,
+ * since its module reference counter is increased
+ * by storage when it is created - just like network protocols.
+ */
+static inline void dst_put_alg(struct dst_alg *alg)
+{
+ module_put(alg->ops->owner);
+ if (atomic_dec_and_test(&alg->refcnt))
+ dst_free_alg(alg);
+}
+
+static void dst_remove_disk(struct dst_storage *st)
+{
+ put_disk(st->disk);
+ blk_cleanup_queue(st->queue);
+}
+
+static void dst_free_storage(struct dst_storage *st)
+{
+ BUG_ON(rb_first(&st->tree_root) != NULL);
+
+ dst_remove_disk(st);
+ dst_put_alg(st->alg);
+ kfree(st);
+}
+
+static inline void dst_put_storage(struct dst_storage *st)
+{
+ if (atomic_dec_and_test(&st->refcnt))
+ dst_free_storage(st);
+}
+
+static struct bio_set *dst_bio_set;
+
+static void dst_destructor(struct bio *bio)
+{
+ bio_free(bio, dst_bio_set);
+}
+
+/*
+ * Internal callback for local requests (i.e. for local disk),
+ * which are splitted between nodes (part with local node destination
+ * ends up with this ->bi_end_io() callback).
+ */
+static int dst_end_io(struct bio *bio, unsigned int size, int err)
+{
+ struct bio *orig_bio = bio->bi_private;
+
+ if (bio->bi_size)
+ return 0;
+
+ dprintk("%s: bio: %p, orig_bio: %p, size: %u, orig_size: %u.\n",
+ __func__, bio, orig_bio, size, orig_bio->bi_size);
+
+ bio_endio(orig_bio, size, 0);
+ bio_put(bio);
+ return 0;
+}
+
+/*
+ * This function sends processing request down to block layer (for local node)
+ * or to network state machine (for remote node).
+ */
+static int dst_node_push(struct dst_request *req)
+{
+ int err = 0;
+ struct dst_node *n = req->node;
+
+ if (n->bdev) {
+ struct bio *bio = req->bio;
+
+ dprintk("%s: start: %llu, num: %d, idx: %d, offset: %u, "
+ "size: %llu, bi_idx: %d, bi_vcnt: %d.\n",
+ __func__, req->start, req->num, req->idx,
+ req->offset, req->size, bio->bi_idx, bio->bi_vcnt);
+
+ if (likely(bio->bi_idx == req->idx &&
+ bio->bi_vcnt == req->num)) {
+ bio->bi_bdev = n->bdev;
+ bio->bi_sector = req->start;
+ } else {
+ struct bio *clone = bio_alloc_bioset(GFP_NOIO,
+ bio->bi_max_vecs, dst_bio_set);
+ struct bio_vec *bv;
+
+ err = -ENOMEM;
+ if (!clone)
+ goto out_put;
+
+ __bio_clone(clone, bio);
+
+ bv = bio_iovec_idx(clone, req->idx);
+ bv->bv_offset += req->offset;
+ clone->bi_idx = req->idx;
+ clone->bi_vcnt = req->num;
+ clone->bi_bdev = n->bdev;
+ clone->bi_sector = req->start;
+ clone->bi_destructor = dst_destructor;
+ clone->bi_private = bio;
+ clone->bi_size = req->orig_size;
+ clone->bi_end_io = &dst_end_io;
+ req->bio = clone;
+
+ dprintk("%s: start: %llu, num: %d, idx: %d, "
+ "offset: %u, size: %llu, "
+ "bi_idx: %d, bi_vcnt: %d, req: %p, bio: %p.\n",
+ __func__, req->start, req->num, req->idx,
+ req->offset, req->size,
+ clone->bi_idx, clone->bi_vcnt, req, req->bio);
+
+ }
+ }
+
+ err = n->st->alg->ops->remap(req);
+
+out_put:
+ dst_node_put(n);
+ return err;
+}
+
+/*
+ * This function is invoked from block layer request processing function,
+ * its task is to remap block request to different nodes.
+ */
+static int dst_remap(struct dst_storage *st, struct bio *bio)
+{
+ struct dst_node *n;
+ int err = -EINVAL, i, cnt;
+ unsigned int bio_sectors = bio->bi_size>>9;
+ struct bio_vec *bv;
+ struct dst_request req;
+ u64 rest_in_node, start, total_size;
+
+ mutex_lock(&st->tree_lock);
+ n = dst_storage_tree_search(st, bio->bi_sector);
+ mutex_unlock(&st->tree_lock);
+
+ if (!n) {
+ dprintk("%s: failed to find a node for bio: %p, "
+ "sector: %llu.\n",
+ __func__, bio, (u64)bio->bi_sector);
+ return -ENODEV;
+ }
+
+ dprintk("%s: bio: %llu-%llu, dev: %llu-%llu, in sectors.\n",
+ __func__, (u64)bio->bi_sector, (u64)bio->bi_sector+bio_sectors,
+ n->start, n->start+n->size);
+
+ memset(&req, 0, sizeof(struct dst_request));
+
+ start = bio->bi_sector;
+ total_size = bio->bi_size;
+
+ dst_fill_request(&req, bio, start - n->start, n, &kst_bio_endio);
+
+ /*
+ * Common fast path - block request does not cross
+ * boundaries between nodes.
+ */
+ if (likely(bio->bi_sector + bio_sectors <= n->start + n->size))
+ return dst_node_push(&req);
+
+ req.size = 0;
+ req.idx = 0;
+ req.num = 1;
+
+ cnt = bio->bi_vcnt;
+
+ rest_in_node = to_bytes(n->size - req.start);
+
+ for (i = 0; i < cnt; ++i) {
+ bv = bio_iovec_idx(bio, i);
+
+ if (req.size + bv->bv_len >= rest_in_node) {
+ unsigned int diff = req.size + bv->bv_len -
+ rest_in_node;
+
+ req.size += bv->bv_len - diff;
+ req.start = start - n->start;
+ req.orig_size = req.size;
+ req.bio = bio;
+ req.bio_endio = &kst_bio_endio;
+
+ dprintk("%s: split: start: %llu/%llu, size: %llu, "
+ "total_size: %llu, diff: %u, idx: %d, "
+ "num: %d, bv_len: %u, bv_offset: %u.\n",
+ __func__, start, req.start, req.size,
+ total_size, diff, req.idx, req.num,
+ bv->bv_len, bv->bv_offset);
+
+ err = dst_node_push(&req);
+ if (err)
+ break;
+
+ total_size -= req.orig_size;
+
+ if (!total_size)
+ break;
+
+ start += to_sector(req.orig_size);
+
+ req.flags = (test_bit(DST_NODE_FROZEN, &n->flags))?
+ DST_REQ_ALWAYS_QUEUE:0;
+ req.orig_size = req.size = diff;
+
+ if (diff) {
+ req.offset = bv->bv_len - diff;
+ req.idx = req.num - 1;
+ } else {
+ req.idx = req.num;
+ req.offset = 0;
+ }
+
+ dprintk("%s: next: start: %llu, size: %llu, "
+ "total_size: %llu, diff: %u, idx: %d, "
+ "num: %d, offset: %u, bv_len: %u, "
+ "bv_offset: %u.\n",
+ __func__, start, req.size, total_size, diff,
+ req.idx, req.num, req.offset,
+ bv->bv_len, bv->bv_offset);
+
+ mutex_lock(&st->tree_lock);
+ n = dst_storage_tree_search(st, start);
+ mutex_unlock(&st->tree_lock);
+
+ if (!n) {
+ err = -ENODEV;
+ dprintk("%s: failed to find a split node for "
+ "bio: %p, sector: %llu, start: %llu.\n",
+ __func__, bio, (u64)bio->bi_sector,
+ req.start);
+ break;
+ }
+
+ req.state = n->state;
+ req.node = n;
+ req.start = start - n->start;
+ rest_in_node = to_bytes(n->size - req.start);
+
+ dprintk("%s: req.start: %llu, start: %llu, "
+ "dev_start: %llu, dev_size: %llu, "
+ "rest_in_node: %llu.\n",
+ __func__, req.start, start, n->start,
+ n->size, rest_in_node);
+ } else {
+ req.size += bv->bv_len;
+ req.num++;
+ }
+ }
+
+ dprintk("%s: last request: start: %llu, size: %llu, "
+ "total_size: %llu.\n", __func__,
+ req.start, req.size, total_size);
+ if (total_size) {
+ req.orig_size = req.size;
+ req.bio = bio;
+ req.bio_endio = &kst_bio_endio;
+
+ dprintk("%s: last: start: %llu/%llu, size: %llu, "
+ "total_size: %llu, idx: %d, num: %d.\n",
+ __func__, start, req.start, req.size,
+ total_size, req.idx, req.num);
+
+ err = dst_node_push(&req);
+ if (!err) {
+ total_size -= req.orig_size;
+
+ BUG_ON(total_size != 0);
+ }
+ }
+
+ dprintk("%s: end bio: %p, err: %d.\n", __func__, bio, err);
+ return err;
+}
+
+/*
+ * Distributed storage erquest processing function.
+ * It calls algorithm spcific remapping code only.
+ */
+static int dst_request(struct request_queue *q, struct bio *bio)
+{
+ struct dst_storage *st = q->queuedata;
+ int err;
+
+ dprintk("\n%s: start: st: %p, bio: %p, cnt: %u.\n",
+ __func__, st, bio, bio->bi_vcnt);
+
+ err = dst_remap(st, bio);
+ if (err)
+ bio_endio(bio, bio->bi_size, err);
+
+ dprintk("%s: end: st: %p, bio: %p, err: %d.\n",
+ __func__, st, bio, err);
+ return 0;
+}
+
+static void dst_unplug(struct request_queue *q)
+{
+}
+
+static int dst_flush(struct request_queue *q, struct gendisk *disk, sector_t *sec)
+{
+ return 0;
+}
+
+static int dst_blk_open(struct inode *inode, struct file *file)
+{
+ struct dst_storage *st = inode->i_bdev->bd_disk->private_data;
+
+ atomic_inc(&st->refcnt);
+ return 0;
+}
+
+static int dst_blk_release(struct inode *inode, struct file *file)
+{
+ struct dst_storage *st = inode->i_bdev->bd_disk->private_data;
+
+ dst_put_storage(st);
+ return 0;
+}
+
+static struct block_device_operations dst_blk_ops = {
+ .open = &dst_blk_open,
+ .release = &dst_blk_release,
+ .owner = THIS_MODULE,
+};
+
+/*
+ * Block layer binding - disk is created when array is fully configured
+ * by userspace request.
+ */
+static int dst_create_disk(struct dst_storage *st)
+{
+ int err = -ENOMEM;
+
+ st->queue = blk_alloc_queue(GFP_KERNEL);
+ if (!st->queue)
+ goto err_out_exit;
+
+ st->queue->queuedata = st;
+ blk_queue_make_request(st->queue, dst_request);
+ blk_queue_bounce_limit(st->queue, BLK_BOUNCE_ANY);
+ st->queue->unplug_fn = dst_unplug;
+ st->queue->issue_flush_fn = dst_flush;
+
+ err = -EINVAL;
+ st->disk = alloc_disk(1);
+ if (!st->disk)
+ goto err_out_free_queue;
+
+ st->disk->major = dst_major;
+ st->disk->first_minor = (((unsigned long)st->disk) ^
+ (((unsigned long)st->disk) >> 31)) & 0xff;
+ st->disk->fops = &dst_blk_ops;
+ st->disk->queue = st->queue;
+ st->disk->private_data = st;
+ snprintf(st->disk->disk_name, sizeof(st->disk->disk_name),
+ "dst-%s-%d", st->name, st->disk->first_minor);
+
+ return 0;
+
+err_out_free_queue:
+ blk_cleanup_queue(st->queue);
+err_out_exit:
+ return err;
+}
+
+/*
+ * Shows node name in sysfs.
+ */
+static ssize_t dst_name_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct dst_storage *st = container_of(dev, struct dst_storage, device);
+
+ return sprintf(buf, "%s\n", st->name);
+}
+
+static void dst_remove_all_nodes(struct dst_storage *st)
+{
+ struct dst_node *n, *node, *tmp;
+ struct rb_node *rb_node;
+
+ mutex_lock(&st->tree_lock);
+ while ((rb_node = rb_first(&st->tree_root)) != NULL) {
+ n = rb_entry(rb_node, struct dst_node, tree_node);
+ rb_erase(&n->tree_node, &st->tree_root);
+ if (!n->shared_head && atomic_read(&n->shared_num)) {
+ list_for_each_entry_safe(node, tmp, &n->shared, shared) {
+ list_del(&node->shared);
+ atomic_dec(&n->shared_num);
+ dst_node_put(node->shared_head);
+ node->shared_head = NULL;
+ dst_node_put(node);
+ }
+ }
+ dst_node_put(n);
+ }
+ mutex_unlock(&st->tree_lock);
+}
+
+/*
+ * Shows node layout in syfs.
+ */
+static ssize_t dst_nodes_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct dst_storage *st = container_of(dev, struct dst_storage, device);
+ int size = PAGE_CACHE_SIZE, sz;
+ struct dst_node *n;
+ struct rb_node *rb_node;
+
+ sz = sprintf(buf, "sectors (start [size]): ");
+ size -= sz;
+ buf += sz;
+
+ mutex_lock(&st->tree_lock);
+ for (rb_node = rb_first(&st->tree_root); rb_node;
+ rb_node = rb_next(rb_node)) {
+ n = rb_entry(rb_node, struct dst_node, tree_node);
+ if (size < 32)
+ break;
+ sz = sprintf(buf, "%llu [%llu]", n->start, n->size);
+ buf += sz;
+ size -= sz;
+
+ if (!rb_next(rb_node))
+ break;
+
+ sz = sprintf(buf, " | ");
+ buf += sz;
+ size -= sz;
+ }
+ mutex_unlock(&st->tree_lock);
+ size -= sprintf(buf, "\n");
+ return PAGE_CACHE_SIZE - size;
+}
+
+/*
+ * Algorithm currently being used by given storage.
+ */
+static ssize_t dst_alg_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct dst_storage *st = container_of(dev, struct dst_storage, device);
+ return sprintf(buf, "%s\n", st->alg->name);
+}
+
+/*
+ * Writing to this sysfs file allows to remove all nodes
+ * and storage itself automatically.
+ */
+static ssize_t dst_remove_nodes(struct device *dev,
+ struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct dst_storage *st = container_of(dev, struct dst_storage, device);
+ dst_remove_all_nodes(st);
+ return count;
+}
+
+static DEVICE_ATTR(name, 0444, dst_name_show, NULL);
+static DEVICE_ATTR(nodes, 0444, dst_nodes_show, NULL);
+static DEVICE_ATTR(alg, 0444, dst_alg_show, NULL);
+static DEVICE_ATTR(remove_all_nodes, 0644, NULL, dst_remove_nodes);
+
+static int dst_create_storage_attributes(struct dst_storage *st)
+{
+ int err;
+
+ err = device_create_file(&st->device, &dev_attr_name);
+ err = device_create_file(&st->device, &dev_attr_nodes);
+ err = device_create_file(&st->device, &dev_attr_alg);
+ err = device_create_file(&st->device, &dev_attr_remove_all_nodes);
+ return 0;
+}
+
+static void dst_remove_storage_attributes(struct dst_storage *st)
+{
+ device_remove_file(&st->device, &dev_attr_name);
+ device_remove_file(&st->device, &dev_attr_nodes);
+ device_remove_file(&st->device, &dev_attr_alg);
+ device_remove_file(&st->device, &dev_attr_remove_all_nodes);
+}
+
+static void dst_storage_sysfs_exit(struct dst_storage *st)
+{
+ dst_remove_storage_attributes(st);
+ device_unregister(&st->device);
+}
+
+static int dst_storage_sysfs_init(struct dst_storage *st)
+{
+ int err;
+
+ memcpy(&st->device, &dst_dev, sizeof(struct device));
+ snprintf(st->device.bus_id, sizeof(st->device.bus_id), "%s", st->name);
+
+ err = device_register(&st->device);
+ if (err) {
+ dprintk(KERN_ERR "Failed to register dst device %s, err: %d.\n",
+ st->name, err);
+ goto err_out_exit;
+ }
+
+ dst_create_storage_attributes(st);
+
+ return 0;
+
+err_out_exit:
+ return err;
+}
+
+/*
+ * This functions shows size and start of the appropriate node.
+ * Both are in sectors.
+ */
+static ssize_t dst_show_start(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+
+ return sprintf(buf, "%llu\n", n->start);
+}
+
+static ssize_t dst_show_size(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+
+ return sprintf(buf, "%llu\n", n->size);
+}
+
+/*
+ * This function shows node's flags in hex.
+ */
+static ssize_t dst_show_flags(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+
+ return sprintf(buf, "0x%lx\n", n->flags);
+}
+
+/*
+ * Shows type of the remote node - device major/minor number
+ * for local nodes and address (af_inet ipv4/ipv6 only) for remote nodes.
+ */
+static ssize_t dst_show_type(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct sockaddr addr;
+ struct socket *sock;
+ int addrlen;
+
+ if (!n->state && !n->bdev)
+ return 0;
+
+ if (n->bdev)
+ return sprintf(buf, "L: %d:%d\n",
+ MAJOR(n->bdev->bd_dev), MINOR(n->bdev->bd_dev));
+
+ sock = n->state->socket;
+ if (sock->ops->getname(sock, &addr, &addrlen, 2))
+ return 0;
+
+ if (sock->ops->family == AF_INET) {
+ struct sockaddr_in *sin = (struct sockaddr_in *)&addr;
+ return sprintf(buf, "R: %u.%u.%u.%u:%d\n",
+ NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
+ } else if (sock->ops->family == AF_INET6) {
+ struct sockaddr_in6 *sin = (struct sockaddr_in6 *)&addr;
+ return sprintf(buf,
+ "R: %04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d\n",
+ NIP6(sin->sin6_addr), ntohs(sin->sin6_port));
+ }
+ return 0;
+}
+
+static struct device_attribute dst_node_attrs[] = {
+ __ATTR(start, 0444, dst_show_start, NULL),
+ __ATTR(size, 0444, dst_show_size, NULL),
+ __ATTR(flags, 0444, dst_show_flags, NULL),
+ __ATTR(type, 0444, dst_show_type, NULL),
+};
+
+static int dst_create_node_attributes(struct dst_node *n)
+{
+ int err, i;
+
+ for (i=0; i<ARRAY_SIZE(dst_node_attrs); ++i)
+ err = device_create_file(&n->device,
+ &dst_node_attrs[i]);
+ return 0;
+}
+
+static void dst_remove_node_attributes(struct dst_node *n)
+{
+ int i;
+
+ for (i=0; i<ARRAY_SIZE(dst_node_attrs); ++i)
+ device_remove_file(&n->device,
+ &dst_node_attrs[i]);
+}
+
+static void dst_node_sysfs_exit(struct dst_node *n)
+{
+ if (n->device.parent == &n->st->device) {
+ dst_remove_node_attributes(n);
+ device_unregister(&n->device);
+ n->device.parent = NULL;
+ }
+}
+
+static int dst_node_sysfs_init(struct dst_node *n)
+{
+ int err;
+
+ memcpy(&n->device, &dst_node_dev, sizeof(struct device));
+
+ n->device.parent = &n->st->device;
+
+ snprintf(n->device.bus_id, sizeof(n->device.bus_id),
+ "n-%llu-%p", n->start, n);
+ err = device_register(&n->device);
+ if (err) {
+ dprintk(KERN_ERR "Failed to register node, err: %d.\n", err);
+ goto err_out_exit;
+ }
+
+ dst_create_node_attributes(n);
+
+ return 0;
+
+err_out_exit:
+ n->device.parent = NULL;
+ return err;
+}
+
+/*
+ * Gets a reference for given storage, if
+ * storage with given name and algorithm being used
+ * does not exist it is created.
+ */
+static struct dst_storage *dst_get_storage(char *name, char *aname, int alloc)
+{
+ struct dst_storage *st, *rst = NULL;
+ int err;
+ struct dst_alg *alg;
+
+ mutex_lock(&dst_storage_lock);
+ list_for_each_entry(st, &dst_storage_list, entry) {
+ if (!strcmp(name, st->name) && !strcmp(st->alg->name, aname)) {
+ rst = st;
+ atomic_inc(&st->refcnt);
+ break;
+ }
+ }
+
+ if (rst || !alloc) {
+ mutex_unlock(&dst_storage_lock);
+ return rst;
+ }
+
+ st = kzalloc(sizeof(struct dst_storage), GFP_KERNEL);
+ if (!st) {
+ mutex_unlock(&dst_storage_lock);
+ return NULL;
+ }
+
+ mutex_init(&st->tree_lock);
+ /*
+ * One for storage itself,
+ * another one for attached node below.
+ */
+ atomic_set(&st->refcnt, 2);
+ snprintf(st->name, DST_NAMELEN, "%s", name);
+ st->tree_root.rb_node = NULL;
+
+ err = dst_storage_sysfs_init(st);
+ if (err)
+ goto err_out_free;
+
+ err = dst_create_disk(st);
+ if (err)
+ goto err_out_sysfs_exit;
+
+ mutex_lock(&dst_alg_lock);
+ list_for_each_entry(alg, &dst_alg_list, entry) {
+ if (!strcmp(alg->name, aname)) {
+ atomic_inc(&alg->refcnt);
+ try_module_get(alg->ops->owner);
+ st->alg = alg;
+ break;
+ }
+ }
+ mutex_unlock(&dst_alg_lock);
+
+ if (!st->alg)
+ goto err_out_disk_remove;
+
+ list_add_tail(&st->entry, &dst_storage_list);
+ mutex_unlock(&dst_storage_lock);
+
+ return st;
+
+err_out_disk_remove:
+ dst_remove_disk(st);
+err_out_sysfs_exit:
+ dst_storage_sysfs_exit(st);
+err_out_free:
+ mutex_unlock(&dst_storage_lock);
+ kfree(st);
+ return NULL;
+}
+
+/*
+ * Allows to allocate and add new algorithm by external modules.
+ */
+struct dst_alg *dst_alloc_alg(char *name, struct dst_alg_ops *ops)
+{
+ struct dst_alg *alg;
+
+ alg = kzalloc(sizeof(struct dst_alg), GFP_KERNEL);
+ if (!alg)
+ return NULL;
+ snprintf(alg->name, DST_NAMELEN, "%s", name);
+ atomic_set(&alg->refcnt, 1);
+ alg->ops = ops;
+
+ mutex_lock(&dst_alg_lock);
+ list_add_tail(&alg->entry, &dst_alg_list);
+ mutex_unlock(&dst_alg_lock);
+
+ return alg;
+}
+EXPORT_SYMBOL_GPL(dst_alloc_alg);
+
+/*
+ * Removing algorithm from main list of supported algorithms.
+ */
+void dst_remove_alg(struct dst_alg *alg)
+{
+ mutex_lock(&dst_alg_lock);
+ list_del_init(&alg->entry);
+ mutex_unlock(&dst_alg_lock);
+
+ dst_put_alg(alg);
+}
+EXPORT_SYMBOL_GPL(dst_remove_alg);
+
+static void dst_cleanup_node(struct dst_node *n)
+{
+ struct dst_storage *st = n->st;
+
+ if (n->shared_head) {
+ mutex_lock(&st->tree_lock);
+ list_del(&n->shared);
+ mutex_unlock(&st->tree_lock);
+
+ atomic_dec(&n->shared_head->refcnt);
+ dst_node_put(n->shared_head);
+ n->shared_head = NULL;
+ }
+
+ if (n->cleanup)
+ n->cleanup(n);
+ dst_node_sysfs_exit(n);
+ n->st->alg->ops->del_node(n);
+ kfree(n);
+}
+
+/*
+ * This can deadlock if called under st->tree_lock being held,
+ * so take care to only call this when reference counter can not
+ * hit zero and thus start node freeing.
+ */
+void dst_node_put(struct dst_node *n)
+{
+ if (atomic_dec_and_test(&n->refcnt)) {
+ struct dst_storage *st = n->st;
+
+ dprintk("%s: freeing node: %p, start: %llu, size: %llu, "
+ "shared_head: %p, shared_num: %d, flags: %lx.\n",
+ __func__, n, n->start, n->size,
+ n->shared_head, atomic_read(&n->shared_num),
+ n->flags);
+
+ dst_cleanup_node(n);
+ dst_put_storage(st);
+ }
+}
+EXPORT_SYMBOL_GPL(dst_node_put);
+
+static inline int dst_compare_id(struct dst_node *old, u64 new)
+{
+ if (old->start + old->size <= new)
+ return 1;
+ if (old->start > new)
+ return -1;
+ return 0;
+}
+
+/*
+ * Tree of of the nodes, which form the storage.
+ * Tree is indexed via start of the node and its size.
+ * Comparison function above.
+ */
+struct dst_node *dst_storage_tree_search(struct dst_storage *st, u64 start)
+{
+ struct rb_node *n = st->tree_root.rb_node;
+ struct dst_node *dn;
+ int cmp;
+
+ while (n) {
+ dn = rb_entry(n, struct dst_node, tree_node);
+
+ cmp = dst_compare_id(dn, start);
+ dprintk("%s: tree: %llu-%llu, new: %llu.\n",
+ __func__, dn->start, dn->start+dn->size, start);
+ if (cmp < 0)
+ n = n->rb_left;
+ else if (cmp > 0)
+ n = n->rb_right;
+ else {
+ return dst_node_get(dn);
+ }
+ }
+ return NULL;
+}
+EXPORT_SYMBOL_GPL(dst_storage_tree_search);
+
+/*
+ * This function allows to remove a node with given start address
+ * from the storage.
+ */
+static struct dst_node *dst_storage_tree_del(struct dst_storage *st, u64 start)
+{
+ struct dst_node *n = dst_storage_tree_search(st, start);
+
+ if (!n)
+ return NULL;
+
+ rb_erase(&n->tree_node, &st->tree_root);
+ dst_node_put(n);
+ return n;
+}
+
+/*
+ * This function allows to add given node to the storage.
+ * Returns -EEXIST if the same area is already covered by another node.
+ * This is return must be checked for redundancy algorithms.
+ */
+static struct dst_node *dst_storage_tree_add(struct dst_node *new,
+ struct dst_storage *st)
+{
+ struct rb_node **n = &st->tree_root.rb_node, *parent = NULL;
+ struct dst_node *dn;
+ int cmp;
+
+ while (*n) {
+ parent = *n;
+ dn = rb_entry(parent, struct dst_node, tree_node);
+
+ cmp = dst_compare_id(dn, new->start);
+ dprintk("%s: tree: %llu-%llu, new: %llu.\n",
+ __func__, dn->start, dn->start+dn->size,
+ new->start);
+ if (cmp < 0)
+ n = &parent->rb_left;
+ else if (cmp > 0)
+ n = &parent->rb_right;
+ else {
+ return dn;
+ }
+ }
+
+ rb_link_node(&new->tree_node, parent, n);
+ rb_insert_color(&new->tree_node, &st->tree_root);
+
+ return NULL;
+}
+
+/*
+ * This function finds devices major/minor numbers for given pathname.
+ */
+static int dst_lookup_device(const char *path, dev_t *dev)
+{
+ int err;
+ struct nameidata nd;
+ struct inode *inode;
+
+ err = path_lookup(path, LOOKUP_FOLLOW, &nd);
+ if (err)
+ return err;
+
+ inode = nd.dentry->d_inode;
+ if (!inode) {
+ err = -ENOENT;
+ goto out;
+ }
+
+ if (!S_ISBLK(inode->i_mode)) {
+ err = -ENOTBLK;
+ goto out;
+ }
+
+ *dev = inode->i_rdev;
+
+out:
+ path_release(&nd);
+ return err;
+}
+
+/*
+ * Cleanup routings for local, local exporting and remote nodes.
+ */
+static void dst_cleanup_remote(struct dst_node *n)
+{
+ if (n->state) {
+ kst_state_exit(n->state);
+ n->state = NULL;
+ }
+}
+
+static void dst_cleanup_local(struct dst_node *n)
+{
+ if (n->bdev) {
+ sync_blockdev(n->bdev);
+ blkdev_put(n->bdev);
+ n->bdev = NULL;
+ }
+}
+
+static void dst_cleanup_local_export(struct dst_node *n)
+{
+ dst_cleanup_local(n);
+ dst_cleanup_remote(n);
+}
+
+/*
+ * Header receiving function - may block.
+ */
+int dst_data_recv_header(struct socket *sock,
+ struct dst_remote_request *r, int block)
+{
+ struct msghdr msg;
+ struct kvec iov;
+
+ iov.iov_base = r;
+ iov.iov_len = sizeof(struct dst_remote_request);
+
+ msg.msg_iov = (struct iovec *)&iov;
+ msg.msg_iovlen = 1;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ msg.msg_flags = (block)?MSG_WAITALL:MSG_DONTWAIT | MSG_NOSIGNAL;
+
+ return kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len,
+ msg.msg_flags);
+}
+
+/*
+ * Header sending function - may block.
+ */
+int dst_data_send_header(struct socket *sock,
+ struct dst_remote_request *r)
+{
+ struct msghdr msg;
+ struct kvec iov;
+
+ iov.iov_base = r;
+ iov.iov_len = sizeof(struct dst_remote_request);
+
+ msg.msg_iov = (struct iovec *)&iov;
+ msg.msg_iovlen = 1;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ msg.msg_flags = MSG_WAITALL | MSG_NOSIGNAL;
+
+ return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
+}
+
+static inline void dst_node_set_size(struct dst_node *n, u64 size)
+{
+ if (n->size)
+ n->size = min(size, n->size);
+ else
+ n->size = size;
+}
+
+/*
+ * Setup routings for local, local exporting and remote nodes.
+ */
+static int dst_setup_local(struct dst_node *n, struct dst_ctl *ctl,
+ struct dst_local_ctl *l)
+{
+ dev_t dev;
+ int err;
+
+ err = dst_lookup_device(l->name, &dev);
+ if (err)
+ return err;
+
+ n->bdev = open_by_devnum(dev, FMODE_READ|FMODE_WRITE);
+ if (!n->bdev)
+ return -ENODEV;
+
+ dst_node_set_size(n, to_sector(n->bdev->bd_inode->i_size));
+
+ return 0;
+}
+
+static int dst_setup_local_export(struct dst_node *n, struct dst_ctl *ctl,
+ struct dst_le_template *tmp)
+{
+ int err;
+
+ err = dst_setup_local(n, ctl, &tmp->le->lctl);
+ if (err)
+ goto err_out_exit;
+
+ n->state = kst_listener_state_init(n, tmp);
+ if (IS_ERR(n->state)) {
+ err = PTR_ERR(n->state);
+ goto err_out_cleanup;
+ }
+
+ return 0;
+
+err_out_cleanup:
+ dst_cleanup_local(n);
+err_out_exit:
+ return err;
+}
+
+static int dst_request_remote_config(struct dst_node *n, struct socket *sock)
+{
+ struct dst_remote_request cfg;
+ int err = -EINVAL;
+
+ memset(&cfg, 0, sizeof(struct dst_remote_request));
+ cfg.cmd = cpu_to_be32(DST_REMOTE_CFG);
+
+ dprintk("%s: sending header.\n", __func__);
+ err = dst_data_send_header(sock, &cfg);
+ if (err != sizeof(struct dst_remote_request))
+ goto out;
+
+ dprintk("%s: receiving header.\n", __func__);
+ err = dst_data_recv_header(sock, &cfg, 1);
+ if (err != sizeof(struct dst_remote_request))
+ goto out;
+
+ err = -EINVAL;
+ dprintk("%s: checking result: cmd: %d, size reported: %llu, csum is supported: %u.\n",
+ __func__, be32_to_cpu(cfg.cmd), be64_to_cpu(cfg.sector), !!cfg.csum);
+ if (be32_to_cpu(cfg.cmd) != DST_REMOTE_CFG)
+ goto out;
+
+ err = 0;
+ dst_node_set_size(n, be64_to_cpu(cfg.sector));
+
+ if (cfg.csum)
+ __set_bit(DST_NODE_USE_CSUM, &n->flags);
+ else
+ __clear_bit(DST_NODE_USE_CSUM, &n->flags);
+
+out:
+ dprintk("%s: n: %p, err: %d.\n", __func__, n, err);
+ return err;
+}
+
+static int dst_setup_remote(struct dst_node *n, struct dst_ctl *ctl,
+ struct dst_remote_ctl *r)
+{
+ int err;
+ struct socket *sock;
+
+ err = sock_create(r->addr.sa_family, r->type, r->proto, &sock);
+ if (err < 0)
+ goto err_out_exit;
+
+ sock->sk->sk_sndtimeo = sock->sk->sk_rcvtimeo =
+ msecs_to_jiffies(DST_DEFAULT_TIMEO);
+
+ err = sock->ops->connect(sock, (struct sockaddr *)&r->addr,
+ r->addr.sa_data_len, 0);
+ if (err)
+ goto err_out_destroy;
+
+ err = dst_request_remote_config(n, sock);
+ if (err)
+ goto err_out_destroy;
+
+ n->state = kst_data_state_init(n, sock);
+ if (IS_ERR(n->state)) {
+ err = PTR_ERR(n->state);
+ goto err_out_destroy;
+ }
+
+ return 0;
+
+err_out_destroy:
+ sock_release(sock);
+err_out_exit:
+
+ dprintk("%s: n: %p, err: %d.\n", __func__, n, err);
+ return err;
+}
+
+/*
+ * This function inserts node into storage.
+ */
+static int dst_insert_node(struct dst_node *n)
+{
+ int err;
+ struct dst_storage *st = n->st;
+ struct dst_node *dn;
+
+ err = st->alg->ops->add_node(n);
+ if (err)
+ goto err_out_exit;
+
+ err = dst_node_sysfs_init(n);
+ if (err)
+ goto err_out_remove_node;
+
+ mutex_lock(&st->tree_lock);
+ dn = dst_storage_tree_add(n, st);
+ if (dn) {
+ err = -EINVAL;
+ dn->size = st->disk_size;
+ if (dn->start == n->start)
+ err = 0;
+ }
+ mutex_unlock(&st->tree_lock);
+ if (err)
+ goto err_out_sysfs_exit;
+
+ if (n->priv_callback)
+ n->priv_callback(n);
+
+ return 0;
+
+err_out_sysfs_exit:
+ dst_node_sysfs_exit(n);
+err_out_remove_node:
+ st->alg->ops->del_node(n);
+err_out_exit:
+ return err;
+}
+
+static struct dst_node *dst_alloc_node(struct dst_ctl *ctl,
+ void (*cleanup)(struct dst_node *))
+{
+ struct dst_storage *st;
+ struct dst_node *n;
+
+ st = dst_get_storage(ctl->st, ctl->alg, 1);
+ if (!st)
+ goto err_out_exit;
+
+ n = kzalloc(sizeof(struct dst_node), GFP_KERNEL);
+ if (!n)
+ goto err_out_put_storage;
+
+ if (ctl->flags & DST_CTL_USE_CSUM)
+ __set_bit(DST_NODE_USE_CSUM, &n->flags);
+
+ n->w = kst_main_worker;
+ n->st = st;
+ n->cleanup = cleanup;
+ n->start = ctl->start;
+ n->size = ctl->size;
+ INIT_LIST_HEAD(&n->shared);
+ n->shared_head = NULL;
+ atomic_set(&n->shared_num, 0);
+ atomic_set(&n->refcnt, 1);
+
+ return n;
+
+err_out_put_storage:
+ mutex_lock(&dst_storage_lock);
+ list_del_init(&st->entry);
+ mutex_unlock(&dst_storage_lock);
+
+ dst_put_storage(st);
+err_out_exit:
+ return NULL;
+}
+
+/*
+ * Control callback for userspace commands to setup
+ * different nodes and start/stop array.
+ */
+static int dst_add_remote(struct dst_ctl *ctl, void *data, unsigned int len)
+{
+ struct dst_node *n;
+ int err;
+ struct dst_remote_ctl *rctl = data;
+
+ if (len != sizeof(struct dst_remote_ctl))
+ return -EINVAL;
+
+ n = dst_alloc_node(ctl, &dst_cleanup_remote);
+ if (!n)
+ return -ENOMEM;
+
+ err = dst_setup_remote(n, ctl, rctl);
+ if (err < 0)
+ goto err_out_free;
+
+ err = dst_insert_node(n);
+ if (err)
+ goto err_out_cleanup;
+
+ return 0;
+
+err_out_cleanup:
+ if (n->cleanup)
+ n->cleanup(n);
+err_out_free:
+ dst_put_storage(n->st);
+ kfree(n);
+ return err;
+}
+
+static int dst_add_local_export(struct dst_ctl *ctl, void *data, unsigned int len)
+{
+ struct dst_node *n;
+ int err;
+ struct dst_le_template tmp;
+
+ if (len < sizeof(struct dst_local_export_ctl))
+ return -EINVAL;
+
+ tmp.le = data;
+
+ len -= sizeof(struct dst_local_export_ctl);
+ data += sizeof(struct dst_local_export_ctl);
+
+ if (len != tmp.le->secure_attr_num * sizeof(struct dst_secure_user))
+ return -EINVAL;
+
+ tmp.data = data;
+
+ n = dst_alloc_node(ctl, &dst_cleanup_local_export);
+ if (!n)
+ return -EINVAL;
+
+ err = dst_setup_local_export(n, ctl, &tmp);
+ if (err < 0)
+ goto err_out_free;
+
+ err = dst_insert_node(n);
+ if (err)
+ goto err_out_cleanup;
+
+ return 0;
+
+err_out_cleanup:
+ if (n->cleanup)
+ n->cleanup(n);
+err_out_free:
+ dst_put_storage(n->st);
+ kfree(n);
+ return err;
+}
+
+static int dst_add_local(struct dst_ctl *ctl, void *data, unsigned int len)
+{
+ struct dst_node *n;
+ int err;
+ struct dst_local_ctl *lctl = data;
+
+ if (len != sizeof(struct dst_local_ctl))
+ return -EINVAL;
+
+ n = dst_alloc_node(ctl, &dst_cleanup_local);
+ if (!n)
+ return -EINVAL;
+
+ err = dst_setup_local(n, ctl, lctl);
+ if (err < 0)
+ goto err_out_free;
+
+ err = dst_insert_node(n);
+ if (err)
+ goto err_out_cleanup;
+
+ return 0;
+
+err_out_cleanup:
+ if (n->cleanup)
+ n->cleanup(n);
+err_out_free:
+ dst_put_storage(n->st);
+ kfree(n);
+ return err;
+}
+
+static int dst_del_node(struct dst_ctl *ctl, void *data, unsigned int len)
+{
+ struct dst_node *n;
+ struct dst_storage *st;
+ int err = -ENODEV;
+
+ if (len)
+ return -EINVAL;
+
+ st = dst_get_storage(ctl->st, ctl->alg, 0);
+ if (!st)
+ goto err_out_exit;
+
+ mutex_lock(&st->tree_lock);
+ n = dst_storage_tree_del(st, ctl->start);
+ mutex_unlock(&st->tree_lock);
+ if (!n)
+ goto err_out_put;
+
+ dst_node_put(n);
+ dst_put_storage(st);
+
+ return 0;
+
+err_out_put:
+ dst_put_storage(st);
+err_out_exit:
+ return err;
+}
+
+static int dst_start_storage(struct dst_ctl *ctl, void *data, unsigned int len)
+{
+ struct dst_storage *st;
+ int err = -ENXIO;
+
+ if (len)
+ return -EINVAL;
+
+ st = dst_get_storage(ctl->st, ctl->alg, 0);
+ if (!st)
+ return -ENODEV;
+
+ mutex_lock(&st->tree_lock);
+ if (!(st->flags & DST_ST_STARTED) && st->disk_size) {
+ set_capacity(st->disk, st->disk_size);
+ add_disk(st->disk);
+ st->flags |= DST_ST_STARTED;
+ printk("%s: STARTED name: '%s', st: %p, disk_size: %llu.\n",
+ __func__, st->name, st, st->disk_size);
+ err = 0;
+ }
+ mutex_unlock(&st->tree_lock);
+
+ dst_put_storage(st);
+
+ return err;
+}
+
+static int dst_stop_storage(struct dst_ctl *ctl, void *data, unsigned int len)
+{
+ struct dst_storage *st;
+
+ if (len)
+ return -EINVAL;
+
+ st = dst_get_storage(ctl->st, ctl->alg, 0);
+ if (!st)
+ return -ENODEV;
+
+ printk("%s: STOPPED storage: %s.\n", __func__, st->name);
+
+ dst_storage_sysfs_exit(st);
+
+ mutex_lock(&dst_storage_lock);
+ list_del_init(&st->entry);
+ mutex_unlock(&dst_storage_lock);
+
+ if (st->flags & DST_ST_STARTED)
+ del_gendisk(st->disk);
+
+ dst_remove_all_nodes(st);
+ dst_put_storage(st); /* One reference got above */
+ dst_put_storage(st); /* Another reference set during initialization */
+
+ return 0;
+}
+
+typedef int (*dst_command_func)(struct dst_ctl *ctl, void *data, unsigned int len);
+
+/*
+ * List of userspace commands.
+ */
+static dst_command_func dst_commands[] = {
+ [DST_ADD_REMOTE] = &dst_add_remote,
+ [DST_ADD_LOCAL] = &dst_add_local,
+ [DST_ADD_LOCAL_EXPORT] = &dst_add_local_export,
+ [DST_DEL_NODE] = &dst_del_node,
+ [DST_START_STORAGE] = &dst_start_storage,
+ [DST_STOP_STORAGE] = &dst_stop_storage,
+};
+
+/*
+ * Configuration parser.
+ */
+static void cn_dst_callback(void *data)
+{
+ struct dst_ctl *ctl;
+ struct cn_msg *msg = data;
+ int err;
+ struct dst_ctl_ack *ack;
+
+ if (msg->len < sizeof(struct dst_ctl)) {
+ err = -EBADMSG;
+ goto out;
+ }
+
+ ctl = (struct dst_ctl *)msg->data;
+
+ if (ctl->cmd >= DST_CMD_MAX) {
+ err = -EINVAL;
+ goto out;
+ }
+
+ err = dst_commands[ctl->cmd](ctl, msg->data + sizeof(struct dst_ctl),
+ msg->len - sizeof(struct dst_ctl));
+
+out:
+ ack = kmalloc(sizeof(struct dst_ctl_ack), GFP_KERNEL);
+ if (!ack)
+ return;
+
+ memcpy(&ack->msg, msg, sizeof(struct cn_msg));
+
+ ack->msg.ack = msg->ack + 1;
+ ack->msg.len = sizeof(struct dst_ctl_ack) - sizeof(struct cn_msg);
+
+ ack->error = err;
+
+ cn_netlink_send(&ack->msg, 0, GFP_KERNEL);
+ kfree(ack);
+}
+
+static int dst_sysfs_init(void)
+{
+ return bus_register(&dst_dev_bus_type);
+}
+
+static void dst_sysfs_exit(void)
+{
+ bus_unregister(&dst_dev_bus_type);
+}
+
+static int __init dst_sys_init(void)
+{
+ int err = -ENOMEM;
+
+ dst_request_cache = kmem_cache_create("dst", sizeof(struct dst_request),
+ 0, 0, NULL, NULL);
+ if (!dst_request_cache)
+ return -ENOMEM;
+
+ dst_bio_set = bioset_create(32, 32);
+ if (!dst_bio_set)
+ goto err_out_destroy;
+
+ err = register_blkdev(dst_major, DST_NAME);
+ if (err < 0)
+ goto err_out_destroy_bioset;
+ if (err)
+ dst_major = err;
+
+ err = dst_sysfs_init();
+ if (err)
+ goto err_out_unregister;
+
+ kst_main_worker = kst_worker_init(0);
+ if (IS_ERR(kst_main_worker)) {
+ err = PTR_ERR(kst_main_worker);
+ goto err_out_sysfs_exit;
+ }
+
+ err = cn_add_callback(&cn_dst_id, "DST", cn_dst_callback);
+ if (err)
+ goto err_out_worker_exit;
+
+ printk(KERN_INFO "Distributed storage, '%s' release.\n", dst_name);
+
+ return 0;
+
+err_out_worker_exit:
+ kst_worker_exit(kst_main_worker);
+err_out_sysfs_exit:
+ dst_sysfs_exit();
+err_out_unregister:
+ unregister_blkdev(dst_major, DST_NAME);
+err_out_destroy_bioset:
+ bioset_free(dst_bio_set);
+err_out_destroy:
+ kmem_cache_destroy(dst_request_cache);
+ return err;
+}
+
+static void __exit dst_sys_exit(void)
+{
+ cn_del_callback(&cn_dst_id);
+ dst_sysfs_exit();
+ unregister_blkdev(dst_major, DST_NAME);
+ kst_exit_all();
+ bioset_free(dst_bio_set);
+ kmem_cache_destroy(dst_request_cache);
+}
+
+module_init(dst_sys_init);
+module_exit(dst_sys_exit);
+
+MODULE_DESCRIPTION("Distributed storage");
+MODULE_AUTHOR("Evgeniy Polyakov <[email protected]>");
+MODULE_LICENSE("GPL");
diff --git a/include/linux/connector.h b/include/linux/connector.h
index 10eb56b..9e67d58 100644
--- a/include/linux/connector.h
+++ b/include/linux/connector.h
@@ -36,9 +36,11 @@
#define CN_VAL_CIFS 0x1
#define CN_W1_IDX 0x3 /* w1 communication */
#define CN_W1_VAL 0x1
+#define CN_DST_IDX 0x4 /* Distributed storage */
+#define CN_DST_VAL 0x1


-#define CN_NETLINK_USERS 4
+#define CN_NETLINK_USERS 5

/*
* Maximum connector's message size.
diff --git a/include/linux/dst.h b/include/linux/dst.h
new file mode 100644
index 0000000..df92d54
--- /dev/null
+++ b/include/linux/dst.h
@@ -0,0 +1,404 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#ifndef __DST_H
+#define __DST_H
+
+#include <linux/types.h>
+#include <linux/connector.h>
+
+#define DST_NAMELEN 32
+#define DST_NAME "dst"
+#define DST_IOCTL 0xba
+
+enum {
+ DST_DEL_NODE = 0, /* Remove node with given id from storage */
+ DST_ADD_REMOTE, /* Add remote node with given id to the storage */
+ DST_ADD_LOCAL, /* Add local node with given id to the storage */
+ DST_ADD_LOCAL_EXPORT, /* Add local node with given id to the storage to be exported and used by remote peers */
+ DST_START_STORAGE, /* Array is ready and storage can be started, if there will be new nodes
+ * added to the storage, they will be checked against existing size and
+ * probably be dropped (for example in mirror format when new node has smaller
+ * size than array created) or inserted.
+ */
+ DST_STOP_STORAGE, /* Remove array and all nodes. */
+ DST_CMD_MAX
+};
+
+#define DST_CTL_FLAGS_REMOTE (1<<0)
+#define DST_CTL_FLAGS_EXPORT (1<<1)
+#define DST_CTL_USE_CSUM (1<<2)
+
+struct dst_ctl
+{
+ char st[DST_NAMELEN];
+ char alg[DST_NAMELEN];
+ __u32 flags, cmd;
+ __u64 start, size;
+};
+
+struct dst_ctl_ack
+{
+ struct cn_msg msg;
+ int error;
+ int unused[3];
+};
+
+struct dst_local_ctl
+{
+ char name[DST_NAMELEN];
+};
+
+#define SADDR_MAX_DATA 128
+
+struct saddr {
+ unsigned short sa_family; /* address family, AF_xxx */
+ char sa_data[SADDR_MAX_DATA]; /* 14 bytes of protocol address */
+ unsigned short sa_data_len; /* Number of bytes used in sa_data */
+};
+
+struct dst_remote_ctl
+{
+ __u16 type;
+ __u16 proto;
+ struct saddr addr;
+};
+
+#define DST_PERM_READ (1<<0)
+#define DST_PERM_WRITE (1<<1)
+
+/*
+ * Right now it is simple model, where each remote address
+ * is assigned to set of permissions it is allowed to perform.
+ * In real world block device does not know anything but
+ * reading and writing, so it should be more than enough.
+ */
+struct dst_secure_user
+{
+ unsigned int permissions;
+ unsigned short check_offset;
+ struct saddr addr;
+};
+
+struct dst_local_export_ctl
+{
+ __u32 backlog;
+ int secure_attr_num;
+ struct dst_local_ctl lctl;
+ struct dst_remote_ctl rctl;
+};
+
+enum {
+ DST_REMOTE_CFG = 1, /* Request remote configuration */
+ DST_WRITE, /* Writing */
+ DST_READ, /* Reading */
+ DST_NCMD_MAX,
+};
+
+struct dst_remote_request
+{
+ __u32 cmd;
+ __u32 csum;
+ __u32 size;
+ __u32 offset;
+ __u64 sector;
+};
+
+#ifdef __KERNEL__
+
+#include <linux/rbtree.h>
+#include <linux/net.h>
+#include <linux/blkdev.h>
+#include <linux/bio.h>
+#include <linux/mempool.h>
+#include <linux/device.h>
+#include <linux/crc32c.h>
+
+//#define CONFIG_DST_DEBUG
+
+#ifdef CONFIG_DST_DEBUG
+#define dprintk(f, a...) printk(KERN_NOTICE f, ##a)
+#else
+static inline void __attribute__ ((format (printf, 1, 2))) dprintk(const char * fmt, ...) {}
+#endif
+
+struct kst_worker
+{
+ struct list_head entry;
+
+ struct list_head state_list;
+ struct mutex state_mutex;
+
+ struct list_head ready_list;
+ spinlock_t ready_lock;
+
+ mempool_t *req_pool;
+
+ struct task_struct *thread;
+
+ wait_queue_head_t wait;
+
+ int id;
+};
+
+struct kst_state;
+struct dst_node;
+
+#define DST_REQ_HEADER_SENT (1<<0)
+#define DST_REQ_EXPORT (1<<1)
+#define DST_REQ_EXPORT_WRITE (1<<2)
+#define DST_REQ_EXPORT_READ (1<<3)
+#define DST_REQ_ALWAYS_QUEUE (1<<4)
+#define DST_REQ_CHEKSUM_RECV (1<<5)
+#define DST_REQ_CHECK_QUEUE (1<<6)
+
+struct dst_request
+{
+ struct list_head request_list_entry;
+ struct bio *bio;
+ struct kst_state *state;
+ struct dst_node *node;
+
+ u32 tmp_csum, tmp_offset;
+
+ u32 flags;
+
+ u32 offset;
+ int idx, num;
+
+ int (*callback)(struct dst_request *dst,
+ unsigned int revents);
+ void (*bio_endio)(struct dst_request *dst,
+ int err);
+
+ atomic_t refcnt;
+ void *priv;
+
+ u64 size, orig_size, start;
+};
+
+struct kst_state_ops
+{
+ int (*init)(struct kst_state *, void *);
+ int (*push)(struct dst_request *req);
+ int (*ready)(struct kst_state *);
+ int (*recovery)(struct kst_state *, int err);
+ void (*exit)(struct kst_state *);
+};
+
+struct kst_state
+{
+ struct list_head entry;
+ struct list_head ready_entry;
+
+ wait_queue_t wait;
+ wait_queue_head_t *whead;
+
+ struct dst_node *node;
+ struct socket *socket;
+
+ u32 permissions;
+
+ struct mutex request_lock;
+ struct list_head request_list;
+
+ struct kst_state_ops *ops;
+};
+
+#define DST_DEFAULT_TIMEO 2000
+
+struct dst_storage;
+
+struct dst_alg_ops
+{
+ int (*add_node)(struct dst_node *n);
+ void (*del_node)(struct dst_node *n);
+ int (*remap)(struct dst_request *req);
+ int (*error)(struct kst_state *state, int err);
+ struct module *owner;
+};
+
+struct dst_alg
+{
+ struct list_head entry;
+ char name[DST_NAMELEN];
+ atomic_t refcnt;
+ struct dst_alg_ops *ops;
+};
+
+#define DST_ST_STARTED (1<<0)
+
+struct dst_storage
+{
+ struct list_head entry;
+ char name[DST_NAMELEN];
+ struct dst_alg *alg;
+ atomic_t refcnt;
+ struct mutex tree_lock;
+ struct rb_root tree_root;
+
+ struct request_queue *queue;
+ struct gendisk *disk;
+
+ long flags;
+ u64 disk_size;
+
+ struct device device;
+};
+
+#define DST_NODE_FROZEN 0
+#define DST_NODE_NOTSYNC 1
+#define DST_NODE_USE_CSUM 2
+
+struct dst_node
+{
+ struct rb_node tree_node;
+
+ struct list_head shared;
+ struct dst_node *shared_head;
+
+ struct block_device *bdev;
+ struct dst_storage *st;
+ struct kst_state *state;
+ struct kst_worker *w;
+
+ atomic_t refcnt;
+ atomic_t shared_num;
+
+ void (*cleanup)(struct dst_node *);
+
+ long flags;
+
+ u64 start, size;
+
+ void (*priv_callback)(struct dst_node *);
+ void *priv;
+
+ struct device device;
+};
+
+struct dst_le_template
+{
+ struct dst_local_export_ctl *le;
+ void *data;
+};
+
+struct dst_secure
+{
+ struct list_head sec_entry;
+ struct dst_secure_user sec;
+};
+
+void kst_state_exit(struct kst_state *st);
+
+struct kst_worker *kst_worker_init(int id);
+void kst_worker_exit(struct kst_worker *w);
+
+struct kst_state *kst_listener_state_init(struct dst_node *node,
+ struct dst_le_template *tmp);
+struct kst_state *kst_data_state_init(struct dst_node *node,
+ struct socket *newsock);
+
+void kst_wake(struct kst_state *st);
+
+void kst_exit_all(void);
+
+struct dst_alg *dst_alloc_alg(char *name, struct dst_alg_ops *ops);
+void dst_remove_alg(struct dst_alg *alg);
+
+struct dst_node *dst_storage_tree_search(struct dst_storage *st, u64 start);
+
+void dst_node_put(struct dst_node *n);
+
+static inline struct dst_node *dst_node_get(struct dst_node *n)
+{
+ atomic_inc(&n->refcnt);
+ return n;
+}
+
+struct dst_request *dst_clone_request(struct dst_request *req, mempool_t *pool);
+void dst_free_request(struct dst_request *req);
+
+void kst_complete_req(struct dst_request *req, int err);
+void kst_bio_endio(struct dst_request *req, int err);
+void kst_del_req(struct dst_request *req);
+int kst_enqueue_req(struct kst_state *st, struct dst_request *req);
+
+int kst_data_callback(struct dst_request *req, unsigned int revents);
+
+extern struct kmem_cache *dst_request_cache;
+
+static inline sector_t to_sector(unsigned long long n)
+{
+ return (n >> 9);
+}
+
+static inline unsigned long to_bytes(sector_t n)
+{
+ return (n << 9);
+}
+
+/*
+ * Checks state's permissions.
+ * Returns -EPERM if check failed.
+ */
+static inline int kst_check_permissions(struct kst_state *st, struct bio *bio)
+{
+ if ((bio_rw(bio) == WRITE) && !(st->permissions & DST_PERM_WRITE))
+ return -EPERM;
+
+ return 0;
+}
+
+static inline __u32 dst_csum_data(unsigned char *d, unsigned int size)
+{
+ return crc32c_le(0, d, size);
+}
+
+static inline void kst_convert_header(struct dst_remote_request *r)
+{
+ r->cmd = be32_to_cpu(r->cmd);
+ r->sector = be64_to_cpu(r->sector);
+ r->offset = be32_to_cpu(r->offset);
+ r->size = be32_to_cpu(r->size);
+ r->csum = be32_to_cpu(r->csum);
+}
+
+extern int dst_data_send_header(struct socket *sock,
+ struct dst_remote_request *r);
+extern int dst_data_recv_header(struct socket *sock,
+ struct dst_remote_request *r, int block);
+
+static inline void dst_fill_request(struct dst_request *req, struct bio *bio,
+ u64 start, struct dst_node *n, void (*req_bio_endio)(struct dst_request *req, int err))
+{
+ req->flags = (test_bit(DST_NODE_FROZEN, &n->flags))?
+ DST_REQ_ALWAYS_QUEUE:0;
+ req->start = start;
+ req->offset = 0;
+ req->state = n->state;
+ req->node = n;
+ req->bio = bio;
+
+ req->size = bio->bi_size;
+ req->orig_size = bio->bi_size;
+ req->idx = bio->bi_idx;
+ req->num = bio->bi_vcnt;
+
+ req->bio_endio = req_bio_endio;
+}
+
+#endif /* __KERNEL__ */
+#endif /* __DST_H */

2007-12-17 15:05:14

by Evgeniy Polyakov

[permalink] [raw]
Subject: [3/4] DST: Network state machine.


Network state machine.

Includes network async processing state machine and related tasks.

Signed-off-by: Evgeniy Polyakov <[email protected]>


diff --git a/drivers/block/dst/kst.c b/drivers/block/dst/kst.c
new file mode 100644
index 0000000..6d92014
--- /dev/null
+++ b/drivers/block/dst/kst.c
@@ -0,0 +1,1515 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/socket.h>
+#include <linux/kthread.h>
+#include <linux/net.h>
+#include <linux/in.h>
+#include <linux/poll.h>
+#include <linux/bio.h>
+#include <linux/dst.h>
+
+#include <net/sock.h>
+
+struct kst_poll_helper
+{
+ poll_table pt;
+ struct kst_state *st;
+};
+
+static LIST_HEAD(kst_worker_list);
+static DEFINE_MUTEX(kst_worker_mutex);
+
+/*
+ * This function creates bound socket for local export node.
+ */
+static int kst_sock_create(struct kst_state *st, struct saddr *addr,
+ int type, int proto, int backlog)
+{
+ int err;
+
+ err = sock_create(addr->sa_family, type, proto, &st->socket);
+ if (err)
+ goto err_out_exit;
+
+ err = st->socket->ops->bind(st->socket, (struct sockaddr *)addr,
+ addr->sa_data_len);
+
+ err = st->socket->ops->listen(st->socket, backlog);
+ if (err)
+ goto err_out_release;
+
+ st->socket->sk->sk_allocation = GFP_NOIO;
+
+ return 0;
+
+err_out_release:
+ sock_release(st->socket);
+err_out_exit:
+ return err;
+}
+
+static void kst_sock_release(struct kst_state *st)
+{
+ if (st->socket) {
+ sock_release(st->socket);
+ st->socket = NULL;
+ }
+}
+
+void kst_wake(struct kst_state *st)
+{
+ if (st) {
+ struct kst_worker *w = st->node->w;
+ unsigned long flags;
+
+ spin_lock_irqsave(&w->ready_lock, flags);
+ if (list_empty(&st->ready_entry))
+ list_add_tail(&st->ready_entry, &w->ready_list);
+ spin_unlock_irqrestore(&w->ready_lock, flags);
+
+ wake_up(&w->wait);
+ }
+}
+EXPORT_SYMBOL_GPL(kst_wake);
+
+/*
+ * Polling machinery.
+ */
+static int kst_state_wake_callback(wait_queue_t *wait, unsigned mode,
+ int sync, void *key)
+{
+ struct kst_state *st = container_of(wait, struct kst_state, wait);
+ kst_wake(st);
+ return 1;
+}
+
+static void kst_queue_func(struct file *file, wait_queue_head_t *whead,
+ poll_table *pt)
+{
+ struct kst_state *st = container_of(pt, struct kst_poll_helper, pt)->st;
+
+ st->whead = whead;
+ init_waitqueue_func_entry(&st->wait, kst_state_wake_callback);
+ add_wait_queue(whead, &st->wait);
+}
+
+static void kst_poll_exit(struct kst_state *st)
+{
+ if (st->whead) {
+ remove_wait_queue(st->whead, &st->wait);
+ st->whead = NULL;
+ }
+}
+
+/*
+ * This function removes request from state tree and ordering list.
+ */
+void kst_del_req(struct dst_request *req)
+{
+ list_del_init(&req->request_list_entry);
+}
+EXPORT_SYMBOL_GPL(kst_del_req);
+
+static struct dst_request *kst_req_first(struct kst_state *st)
+{
+ struct dst_request *req = NULL;
+
+ if (!list_empty(&st->request_list))
+ req = list_entry(st->request_list.next, struct dst_request,
+ request_list_entry);
+ return req;
+}
+
+/*
+ * This function dequeues first request from the queue and tree.
+ */
+static struct dst_request *kst_dequeue_req(struct kst_state *st)
+{
+ struct dst_request *req;
+
+ mutex_lock(&st->request_lock);
+ req = kst_req_first(st);
+ if (req)
+ kst_del_req(req);
+ mutex_unlock(&st->request_lock);
+ return req;
+}
+
+/*
+ * This function enqueues request into tree, indexed by start of the request,
+ * and also puts request into ordered queue.
+ */
+int kst_enqueue_req(struct kst_state *st, struct dst_request *req)
+{
+ if (unlikely(req->flags & DST_REQ_CHECK_QUEUE)) {
+ struct dst_request *r;
+
+ list_for_each_entry(r, &st->request_list, request_list_entry) {
+ if (bio_rw(r->bio) != bio_rw(req->bio))
+ continue;
+
+ if (r->start >= req->start + req->size)
+ continue;
+
+ if (r->start + r->size <= req->start)
+ continue;
+
+ return -EEXIST;
+ }
+ }
+
+ list_add_tail(&req->request_list_entry, &st->request_list);
+ return 0;
+}
+EXPORT_SYMBOL_GPL(kst_enqueue_req);
+
+/*
+ * BIOs for local exporting node are freed via this function.
+ */
+static void kst_export_put_bio(struct bio *bio)
+{
+ int i;
+ struct bio_vec *bv;
+
+ dprintk("%s: bio: %p, size: %u, idx: %d, num: %d, req: %p.\n",
+ __func__, bio, bio->bi_size, bio->bi_idx,
+ bio->bi_vcnt, bio->bi_private);
+
+ bio_for_each_segment(bv, bio, i)
+ __free_page(bv->bv_page);
+ bio_put(bio);
+}
+
+/*
+ * This is a generic request completion function for requests,
+ * queued for async processing.
+ * If it is local export node, state machine is different,
+ * see details below.
+ */
+void kst_complete_req(struct dst_request *req, int err)
+{
+ dprintk("%s: bio: %p, req: %p, size: %llu, orig_size: %llu, "
+ "bi_size: %u, err: %d, flags: %u.\n",
+ __func__, req->bio, req, req->size, req->orig_size,
+ req->bio->bi_size, err, req->flags);
+
+ if (req->flags & DST_REQ_EXPORT) {
+ if (err || !(req->flags & DST_REQ_EXPORT_WRITE)) {
+ req->bio_endio(req, err);
+ goto out;
+ }
+
+ req->bio->bi_rw = WRITE;
+ generic_make_request(req->bio);
+ } else {
+ req->bio_endio(req, err);
+ }
+out:
+ dst_free_request(req);
+}
+EXPORT_SYMBOL_GPL(kst_complete_req);
+
+static void kst_flush_requests(struct kst_state *st)
+{
+ struct dst_request *req;
+
+ while ((req = kst_dequeue_req(st)) != NULL)
+ kst_complete_req(req, -EIO);
+}
+
+static int kst_poll_init(struct kst_state *st)
+{
+ struct kst_poll_helper ph;
+
+ ph.st = st;
+ init_poll_funcptr(&ph.pt, &kst_queue_func);
+
+ st->socket->ops->poll(NULL, st->socket, &ph.pt);
+ return 0;
+}
+
+/*
+ * Main state creation function.
+ * It creates new state according to given operations
+ * and links it into worker structure and node.
+ */
+static struct kst_state *kst_state_init(struct dst_node *node,
+ unsigned int permissions,
+ struct kst_state_ops *ops, void *data)
+{
+ struct kst_state *st;
+ int err;
+
+ st = kzalloc(sizeof(struct kst_state), GFP_KERNEL);
+ if (!st)
+ return ERR_PTR(-ENOMEM);
+
+ st->permissions = permissions;
+ st->node = node;
+ st->ops = ops;
+ INIT_LIST_HEAD(&st->ready_entry);
+ INIT_LIST_HEAD(&st->entry);
+ INIT_LIST_HEAD(&st->request_list);
+ mutex_init(&st->request_lock);
+
+ err = st->ops->init(st, data);
+ if (err)
+ goto err_out_free;
+ mutex_lock(&node->w->state_mutex);
+ list_add_tail(&st->entry, &node->w->state_list);
+ mutex_unlock(&node->w->state_mutex);
+
+ kst_wake(st);
+
+ return st;
+
+err_out_free:
+ kfree(st);
+ return ERR_PTR(err);
+}
+
+/*
+ * This function is called when node is removed,
+ * or when state is destroyed for connected to local exporting
+ * node client.
+ */
+void kst_state_exit(struct kst_state *st)
+{
+ struct kst_worker *w = st->node->w;
+
+ mutex_lock(&w->state_mutex);
+ list_del_init(&st->entry);
+ mutex_unlock(&w->state_mutex);
+
+ st->ops->exit(st);
+
+ if (st == st->node->state)
+ st->node->state = NULL;
+
+ kfree(st);
+}
+
+static int kst_error(struct kst_state *st, int err)
+{
+ if ((err == -ECONNRESET || err == -EPIPE) && st->ops->recovery)
+ err = st->ops->recovery(st, err);
+
+ return st->node->st->alg->ops->error(st, err);
+}
+
+/*
+ * This is main state processing function.
+ * It tries to complete request and invoke appropriate
+ * callbacks in case of errors or successfull operation finish.
+ */
+static int kst_thread_process_state(struct kst_state *st)
+{
+ int err, empty;
+ unsigned int revents;
+ struct dst_request *req, *tmp;
+
+ mutex_lock(&st->request_lock);
+ if (st->ops->ready) {
+ err = st->ops->ready(st);
+ if (err) {
+ mutex_unlock(&st->request_lock);
+ if (err < 0)
+ kst_state_exit(st);
+ return err;
+ }
+ }
+
+ err = 0;
+ empty = 1;
+ req = NULL;
+ list_for_each_entry_safe(req, tmp, &st->request_list, request_list_entry) {
+ empty = 0;
+ revents = st->socket->ops->poll(st->socket->file,
+ st->socket, NULL);
+ if (!revents)
+ break;
+ err = req->callback(req, revents);
+ if (req->size && !err)
+ err = 1;
+
+ if (err < 0 || !req->size) {
+ if (!req->size)
+ err = 0;
+ kst_del_req(req);
+ kst_complete_req(req, err);
+ }
+
+ if (err)
+ break;
+ }
+
+ dprintk("%s: broke the loop: err: %d, list_empty: %d.\n",
+ __func__, err, list_empty(&st->request_list));
+ mutex_unlock(&st->request_lock);
+
+ if (err < 0) {
+ dprintk("%s: req: %p, err: %d, st: %p, node->state: %p.\n",
+ __func__, req, err, st, st->node->state);
+
+ if (st != st->node->state) {
+ /*
+ * Accepted client has state not related to storage
+ * node, so it must be freed explicitely.
+ * We do not try to fix clients connections to local
+ * export nodes, just drop the client.
+ */
+
+ kst_state_exit(st);
+ return err;
+ }
+
+ err = kst_error(st, err);
+ if (err)
+ return err;
+
+ kst_wake(st);
+ }
+
+ if (list_empty(&st->request_list) && !empty)
+ kst_wake(st);
+
+ return err;
+}
+
+/*
+ * Main worker thread - one per storage.
+ */
+static int kst_thread_func(void *data)
+{
+ struct kst_worker *w = data;
+ struct kst_state *st;
+ unsigned long flags;
+ int err = 0;
+
+ while (!kthread_should_stop()) {
+ wait_event_interruptible_timeout(w->wait,
+ (!list_empty(&w->ready_list) && !list_empty(&w->state_list)) ||
+ kthread_should_stop(), HZ);
+ st = NULL;
+ spin_lock_irqsave(&w->ready_lock, flags);
+ if (!list_empty(&w->ready_list)) {
+ st = list_entry(w->ready_list.next, struct kst_state,
+ ready_entry);
+ list_del_init(&st->ready_entry);
+ }
+ spin_unlock_irqrestore(&w->ready_lock, flags);
+
+ if (!st)
+ continue;
+
+ err = kst_thread_process_state(st);
+ }
+
+ return err;
+}
+
+/*
+ * Worker initialization - this object will host andprocess all states,
+ * which in turn host requests for remote targets.
+ */
+struct kst_worker *kst_worker_init(int id)
+{
+ struct kst_worker *w;
+ int err;
+
+ w = kzalloc(sizeof(struct kst_worker), GFP_KERNEL);
+ if (!w)
+ return ERR_PTR(-ENOMEM);
+
+ w->id = id;
+ init_waitqueue_head(&w->wait);
+ spin_lock_init(&w->ready_lock);
+ mutex_init(&w->state_mutex);
+
+ INIT_LIST_HEAD(&w->ready_list);
+ INIT_LIST_HEAD(&w->state_list);
+
+ w->req_pool = mempool_create_slab_pool(256, dst_request_cache);
+ if (!w->req_pool) {
+ err = -ENOMEM;
+ goto err_out_free;
+ }
+
+ w->thread = kthread_run(&kst_thread_func, w, "kst%d", w->id);
+ if (IS_ERR(w->thread)) {
+ err = PTR_ERR(w->thread);
+ goto err_out_destroy;
+ }
+
+ mutex_lock(&kst_worker_mutex);
+ list_add_tail(&w->entry, &kst_worker_list);
+ mutex_unlock(&kst_worker_mutex);
+
+ return w;
+
+err_out_destroy:
+ mempool_destroy(w->req_pool);
+err_out_free:
+ kfree(w);
+ return ERR_PTR(err);
+}
+
+void kst_worker_exit(struct kst_worker *w)
+{
+ struct kst_state *st, *n;
+
+ mutex_lock(&kst_worker_mutex);
+ list_del(&w->entry);
+ mutex_unlock(&kst_worker_mutex);
+
+ kthread_stop(w->thread);
+
+ list_for_each_entry_safe(st, n, &w->state_list, entry) {
+ kst_state_exit(st);
+ }
+
+ mempool_destroy(w->req_pool);
+ kfree(w);
+}
+
+/*
+ * Common state exit callback.
+ * Removes itself from worker's list of states,
+ * releases socket and flushes all requests.
+ */
+static void kst_common_exit(struct kst_state *st)
+{
+ unsigned long flags;
+ struct kst_worker *w = st->node->w;
+
+ kst_poll_exit(st);
+
+ spin_lock_irqsave(&w->ready_lock, flags);
+ list_del_init(&st->ready_entry);
+ spin_unlock_irqrestore(&w->ready_lock, flags);
+
+ kst_flush_requests(st);
+ kst_sock_release(st);
+}
+
+/*
+ * Listen socket contains security attributes in request_list,
+ * so it can not be flushed via usual way.
+ */
+static void kst_listen_flush(struct kst_state *st)
+{
+ struct dst_secure *s, *tmp;
+
+ list_for_each_entry_safe(s, tmp, &st->request_list, sec_entry) {
+ list_del(&s->sec_entry);
+ kfree(s);
+ }
+}
+
+static void kst_listen_exit(struct kst_state *st)
+{
+ kst_listen_flush(st);
+ kst_common_exit(st);
+}
+
+/*
+ * BIO vector receiving function - does not block, but may sleep because
+ * of scheduling policy.
+ */
+static int kst_data_recv_bio_vec(struct kst_state *st, struct bio_vec *bv,
+ unsigned int offset, unsigned int size)
+{
+ struct msghdr msg;
+ struct kvec iov;
+ void *kaddr;
+ int err;
+
+ kaddr = kmap(bv->bv_page);
+
+ iov.iov_base = kaddr + bv->bv_offset + offset;
+ iov.iov_len = size;
+
+ msg.msg_iov = (struct iovec *)&iov;
+ msg.msg_iovlen = 1;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+
+ err = kernel_recvmsg(st->socket, &msg, &iov, 1, iov.iov_len,
+ msg.msg_flags);
+ kunmap(bv->bv_page);
+
+ return err;
+}
+
+/*
+ * BIO vector sending function - does not block, but may sleep because
+ * of scheduling policy.
+ */
+static int kst_data_send_bio_vec(struct kst_state *st, struct bio_vec *bv,
+ unsigned int offset, unsigned int size)
+{
+ return kernel_sendpage(st->socket, bv->bv_page,
+ bv->bv_offset + offset, size,
+ MSG_DONTWAIT | MSG_NOSIGNAL);
+}
+
+static int kst_data_send_bio_vec_slow(struct kst_state *st, struct bio_vec *bv,
+ unsigned int offset, unsigned int size)
+{
+ struct msghdr msg;
+ struct kvec iov;
+ void *addr;
+ int err;
+
+ addr = kmap(bv->bv_page);
+ iov.iov_base = addr + bv->bv_offset + offset;
+ iov.iov_len = size;
+
+ msg.msg_iov = (struct iovec *)&iov;
+ msg.msg_iovlen = 1;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+
+ err = kernel_sendmsg(st->socket, &msg, &iov, 1, iov.iov_len);
+ kunmap(bv->bv_page);
+
+ return err;
+}
+
+static u32 dst_csum_bvec(struct bio_vec *bv, unsigned int offset, unsigned int size)
+{
+ void *addr;
+ u32 csum;
+
+ addr = kmap_atomic(bv->bv_page, KM_USER0);
+ csum = dst_csum_data(addr + bv->bv_offset + offset, size);
+ kunmap_atomic(addr, KM_USER0);
+
+ return csum;
+}
+
+typedef int (*kst_data_process_bio_vec_t)(struct kst_state *st,
+ struct bio_vec *bv, unsigned int offset, unsigned int size);
+
+/*
+ * @req: processing request.
+ * Contains BIO and all related to its processing info.
+ *
+ * This function sends or receives requested number of pages from given BIO.
+ *
+ * In case of errors negative value is returned and @size,
+ * @index and @off are set to the:
+ * - number of bytes not yet processed (i.e. the rest of the bytes to be
+ * processed).
+ * - index of the last bio_vec started to be processed (header sent).
+ * - offset of the first byte to be processed in the bio_vec.
+ *
+ * If there are no errors, zero is returned.
+ * -EAGAIN is not an error and is transformed into zero return value,
+ * called must check if @size is zero, in that case whole BIO is processed
+ * and thus req->bio_endio() can be called, othervise new request must be allocated
+ * to be processed later.
+ */
+static int kst_data_process_bio(struct dst_request *req)
+{
+ int err = -ENOSPC;
+ struct dst_remote_request r;
+ kst_data_process_bio_vec_t func;
+ unsigned int cur_size;
+ int use_csum = test_bit(DST_NODE_USE_CSUM, &req->node->flags);
+
+ if (bio_rw(req->bio) == WRITE) {
+ int i;
+
+ func = kst_data_send_bio_vec;
+ for (i=req->idx; i<req->num; ++i) {
+ struct bio_vec *bv = bio_iovec_idx(req->bio, i);
+
+ if (PageSlab(bv->bv_page)) {
+ func = kst_data_send_bio_vec_slow;
+ break;
+ }
+ }
+ r.cmd = cpu_to_be32(DST_WRITE);
+ } else {
+ r.cmd = cpu_to_be32(DST_READ);
+ func = kst_data_recv_bio_vec;
+ }
+
+ dprintk("%s: start: [%c], state: %p, node: %p, start: %llu, idx: %d, num: %d, "
+ "size: %llu, offset: %u, flags: %x, use_csum: %d.\n",
+ __func__, (bio_rw(req->bio) == WRITE)?'W':'R', req->state, req->node,
+ req->start, req->idx, req->num, req->size, req->offset,
+ req->flags, use_csum);
+
+ while (req->idx < req->num) {
+ struct bio_vec *bv = bio_iovec_idx(req->bio, req->idx);
+
+ cur_size = min_t(u64, bv->bv_len - req->offset, req->size);
+
+ dprintk("%s: page: %p, slab: %d, count: %d, max: %d, off: %u, len: %u, req->offset: %u, "
+ "req->size: %llu, cur_size: %u, flags: %x, "
+ "use_csum: %d, req->csum: %x.\n",
+ __func__, bv->bv_page, PageSlab(bv->bv_page),
+ atomic_read(&bv->bv_page->_count), req->bio->bi_vcnt,
+ bv->bv_offset, bv->bv_len,
+ req->offset, req->size, cur_size,
+ req->flags, use_csum, req->tmp_csum);
+
+ if (cur_size == 0) {
+ printk(KERN_ERR "%s: %d/%d: start: %llu, "
+ "bv_offset: %u, bv_len: %u, "
+ "req_offset: %u, req_size: %llu, "
+ "req: %p, bio: %p, err: %d.\n",
+ __func__, req->idx, req->num, req->start,
+ bv->bv_offset, bv->bv_len,
+ req->offset, req->size,
+ req, req->bio, err);
+ BUG();
+ }
+
+ if (!(req->flags & DST_REQ_HEADER_SENT)) {
+ r.sector = cpu_to_be64(req->start);
+ r.offset = cpu_to_be32(bv->bv_offset + req->offset);
+ r.size = cpu_to_be32(cur_size);
+ r.csum = 0;
+
+ if (use_csum && bio_rw(req->bio) == WRITE &&
+ !req->tmp_offset) {
+ req->tmp_offset = req->offset;
+ r.csum = cpu_to_be32(dst_csum_bvec(bv,
+ req->offset, cur_size));
+ }
+
+ err = dst_data_send_header(req->state->socket, &r);
+ dprintk("%s: %d/%d: sending header: cmd: %u, start: %llu, "
+ "bv_offset: %u, bv_len: %u, "
+ "a offset: %u, offset: %u, "
+ "cur_size: %u, err: %d.\n",
+ __func__, req->idx, req->num, be32_to_cpu(r.cmd),
+ req->start, bv->bv_offset, bv->bv_len,
+ bv->bv_offset + req->offset,
+ req->offset, cur_size, err);
+
+ if (err != sizeof(struct dst_remote_request)) {
+ if (err >= 0)
+ err = -EINVAL;
+ break;
+ }
+
+ req->flags |= DST_REQ_HEADER_SENT;
+ }
+
+ if (use_csum && (bio_rw(req->bio) != WRITE) &&
+ !(req->flags & DST_REQ_CHEKSUM_RECV)) {
+ struct dst_remote_request tmp_req;
+
+ err = dst_data_recv_header(req->state->socket, &tmp_req, 0);
+ dprintk("%s: %d/%d: receiving header: start: %llu, "
+ "bv_offset: %u, bv_len: %u, "
+ "a offset: %u, offset: %u, "
+ "cur_size: %u, err: %d.\n",
+ __func__, req->idx, req->num,
+ req->start, bv->bv_offset, bv->bv_len,
+ bv->bv_offset + req->offset,
+ req->offset, cur_size, err);
+
+ if (err != sizeof(struct dst_remote_request)) {
+ if (err >= 0)
+ err = -EINVAL;
+ break;
+ }
+
+ if (req->tmp_csum) {
+ printk(KERN_ERR "%s: req: %p, old csum: %x, new: %x.\n",
+ __func__, req, req->tmp_csum,
+ be32_to_cpu(tmp_req.csum));
+ BUG_ON(1);
+ }
+
+ dprintk("%s: req: %p, old csum: %x, new: %x.\n",
+ __func__, req, req->tmp_csum,
+ be32_to_cpu(tmp_req.csum));
+ req->tmp_csum = be32_to_cpu(tmp_req.csum);
+
+ req->flags |= DST_REQ_CHEKSUM_RECV;
+ }
+
+ err = func(req->state, bv, req->offset, cur_size);
+ if (err <= 0)
+ break;
+
+ req->offset += err;
+ req->size -= err;
+
+ if (req->offset != bv->bv_len) {
+ dprintk("%s: %d/%d: this: start: %llu, bv_offset: %u, "
+ "bv_len: %u, offset: %u, "
+ "cur_size: %u, err: %d.\n",
+ __func__, req->idx, req->num, req->start,
+ bv->bv_offset, bv->bv_len,
+ req->offset, cur_size, err);
+ err = -EAGAIN;
+ break;
+ }
+
+ if (use_csum && bio_rw(req->bio) != WRITE) {
+ u32 csum = dst_csum_bvec(bv, req->tmp_offset,
+ bv->bv_len - req->tmp_offset);
+
+ dprintk("%s: req: %p, csum: %x, received csum: %x.\n",
+ __func__, req, csum, req->tmp_csum);
+
+ if (csum != req->tmp_csum) {
+ if (printk_ratelimit()) {
+ printk(KERN_INFO "%s: %d/%d: broken checksum: start: %llu, "
+ "bv_offset: %u, bv_len: %u, "
+ "a offset: %u, offset: %u, "
+ "cur_size: %u, orig_size: %llu.\n",
+ __func__, req->idx, req->num,
+ req->start, bv->bv_offset, bv->bv_len,
+ bv->bv_offset + req->offset,
+ req->offset, cur_size, req->orig_size);
+ printk(KERN_INFO "%s: broken checksum: req: %p, csum: %x, "
+ "should be: %x, flags: %x, "
+ "req->tmp_offset: %u, rw: %lu.\n",
+ __func__, req, csum, req->tmp_csum,
+ req->flags, req->tmp_offset, bio_rw(req->bio));
+ }
+
+ req->offset -= err;
+ req->size += err;
+
+ err = -EREMOTEIO;
+ break;
+ }
+ }
+
+ req->offset = 0;
+ req->idx++;
+ req->flags &= ~(DST_REQ_HEADER_SENT | DST_REQ_CHEKSUM_RECV);
+ req->tmp_csum = 0;
+ req->start += to_sector(bv->bv_len);
+ }
+
+ if (err <= 0 && err != -EAGAIN) {
+ if (err == 0)
+ err = -ECONNRESET;
+ } else
+ err = 0;
+
+ if (err < 0 || (req->idx == req->num && req->size)) {
+ dprintk("%s: return: idx: %d, num: %d, offset: %u, "
+ "size: %llu, err: %d.\n",
+ __func__, req->idx, req->num, req->offset,
+ req->size, err);
+ }
+ dprintk("%s: end: start: %llu, idx: %d, num: %d, "
+ "size: %llu, offset: %u.\n",
+ __func__, req->start, req->idx, req->num,
+ req->size, req->offset);
+
+ return err;
+}
+
+void kst_bio_endio(struct dst_request *req, int err)
+{
+ if (err && printk_ratelimit())
+ printk(KERN_INFO "%s: freeing bio: %p, bi_size: %u, "
+ "orig_size: %llu, req: %p, err: %d.\n",
+ __func__, req->bio, req->bio->bi_size, req->orig_size,
+ req, err);
+ bio_endio(req->bio, req->orig_size, err);
+}
+EXPORT_SYMBOL_GPL(kst_bio_endio);
+
+/*
+ * This callback is invoked by worker thread to process given request.
+ */
+int kst_data_callback(struct dst_request *req, unsigned int revents)
+{
+ int err;
+
+ dprintk("%s: req: %p, num: %d, idx: %d, bio: %p, "
+ "revents: %x, flags: %x.\n",
+ __func__, req, req->num, req->idx, req->bio,
+ revents, req->flags);
+
+ if (req->flags & DST_REQ_EXPORT_READ)
+ return 1;
+
+ err = kst_data_process_bio(req);
+
+ if (revents & (POLLERR | POLLHUP | POLLRDHUP))
+ err = -EPIPE;
+
+ return err;
+}
+EXPORT_SYMBOL_GPL(kst_data_callback);
+
+struct dst_request *dst_clone_request(struct dst_request *req, mempool_t *pool)
+{
+ struct dst_request *new_req;
+
+ new_req = mempool_alloc(pool, GFP_NOIO);
+ if (!new_req)
+ return NULL;
+
+ memset(new_req, 0, sizeof(struct dst_request));
+
+ dprintk("%s: req: %p, new_req: %p.\n", __func__, req, new_req);
+
+ if (req) {
+ new_req->bio = req->bio;
+ new_req->state = req->state;
+ new_req->node = req->node;
+ new_req->idx = req->idx;
+ new_req->num = req->num;
+ new_req->size = req->size;
+ new_req->orig_size = req->orig_size;
+ new_req->offset = req->offset;
+ new_req->tmp_offset = req->tmp_offset;
+ new_req->tmp_csum = req->tmp_csum;
+ new_req->start = req->start;
+ new_req->flags = req->flags;
+ new_req->bio_endio = req->bio_endio;
+ new_req->priv = req->priv;
+ }
+
+ return new_req;
+}
+EXPORT_SYMBOL_GPL(dst_clone_request);
+
+void dst_free_request(struct dst_request *req)
+{
+ dprintk("%s: free req: %p, pool: %p, bio: %p, state: %p, node: %p.\n",
+ __func__, req, req->node->w->req_pool,
+ req->bio, req->state, req->node);
+ mempool_free(req, req->node->w->req_pool);
+}
+EXPORT_SYMBOL_GPL(dst_free_request);
+
+/*
+ * This is main data processing function, eventually invoked from block layer.
+ * It tries to complte request, but if it is about to block, it allocates
+ * new request and queues it to main worker to be processed when events allow.
+ */
+static int kst_data_push(struct dst_request *req)
+{
+ struct kst_state *st = req->state;
+ struct dst_request *new_req;
+ unsigned int revents;
+ int err, locked = 0;
+
+ dprintk("%s: start: %llu, size: %llu, bio: %p.\n",
+ __func__, req->start, req->size, req->bio);
+
+ if (!list_empty(&st->request_list) || (req->flags & DST_REQ_ALWAYS_QUEUE))
+ goto alloc_new_req;
+
+ if (mutex_trylock(&st->request_lock)) {
+ locked = 1;
+
+ if (!list_empty(&st->request_list))
+ goto alloc_new_req;
+
+ revents = st->socket->ops->poll(NULL, st->socket, NULL);
+ if (revents & POLLOUT) {
+ err = kst_data_process_bio(req);
+ if (err < 0)
+ goto out_unlock;
+
+ if (!req->size)
+ goto out_bio_endio;
+ }
+ }
+
+alloc_new_req:
+ err = -ENOMEM;
+ new_req = dst_clone_request(req, req->node->w->req_pool);
+ if (!new_req)
+ goto out_unlock;
+
+ new_req->callback = &kst_data_callback;
+
+ if (!locked)
+ mutex_lock(&st->request_lock);
+
+ locked = 1;
+
+ err = kst_enqueue_req(st, new_req);
+ if (err)
+ goto out_unlock;
+ mutex_unlock(&st->request_lock);
+
+ err = 0;
+ goto out;
+
+out_bio_endio:
+ req->bio_endio(req, err);
+out_unlock:
+ if (locked)
+ mutex_unlock(&st->request_lock);
+ locked = 0;
+
+ if (err) {
+ err = kst_error(st, err);
+ if (!err)
+ goto alloc_new_req;
+ }
+
+ if (err && printk_ratelimit()) {
+ printk(KERN_INFO "%s: error [%c], start: %llu, idx: %d, num: %d, "
+ "size: %llu, offset: %u, err: %d.\n",
+ __func__, (bio_rw(req->bio) == WRITE)?'W':'R',
+ req->start, req->idx, req->num, req->size,
+ req->offset, err);
+ }
+
+out:
+
+ kst_wake(st);
+ return err;
+}
+
+/*
+ * Remote node initialization callback.
+ */
+static int kst_data_init(struct kst_state *st, void *data)
+{
+ int err;
+
+ st->socket = data;
+ st->socket->sk->sk_allocation = GFP_NOIO;
+ /*
+ * Why not?
+ */
+ st->socket->sk->sk_sndbuf = st->socket->sk->sk_sndbuf = 1024*1024*10;
+
+ err = kst_poll_init(st);
+ if (err)
+ return err;
+
+ return 0;
+}
+
+/*
+ * Remote node recovery function - tries to reconnect to given target.
+ */
+static int kst_data_recovery(struct kst_state *st, int err)
+{
+ struct socket *sock;
+ struct sockaddr addr;
+ int addrlen;
+ struct dst_request *req;
+
+ if (err != -ECONNRESET && err != -EPIPE) {
+ dprintk("%s: state %p does not know how "
+ "to recover from error %d.\n",
+ __func__, st, err);
+ return err;
+ }
+
+ err = sock_create(st->socket->ops->family, st->socket->type,
+ st->socket->sk->sk_protocol, &sock);
+ if (err < 0)
+ goto err_out_exit;
+
+ sock->sk->sk_sndtimeo = sock->sk->sk_rcvtimeo =
+ msecs_to_jiffies(DST_DEFAULT_TIMEO);
+
+ err = sock->ops->getname(st->socket, &addr, &addrlen, 2);
+ if (err)
+ goto err_out_destroy;
+
+ err = sock->ops->connect(sock, &addr, addrlen, 0);
+ if (err)
+ goto err_out_destroy;
+
+ kst_poll_exit(st);
+ kst_sock_release(st);
+
+ mutex_lock(&st->request_lock);
+ err = st->ops->init(st, sock);
+ if (!err) {
+ /*
+ * After reconnection is completed all requests
+ * must be resent from the state they were finished previously,
+ * but with new headers.
+ */
+ list_for_each_entry(req, &st->request_list, request_list_entry)
+ req->flags &= ~(DST_REQ_HEADER_SENT | DST_REQ_CHEKSUM_RECV);
+ }
+ mutex_unlock(&st->request_lock);
+ if (err < 0)
+ goto err_out_destroy;
+
+ kst_wake(st);
+ dprintk("%s: reconnected.\n", __func__);
+
+ return 0;
+
+err_out_destroy:
+ sock_release(sock);
+err_out_exit:
+ dprintk("%s: recovery failed: st: %p, err: %d.\n", __func__, st, err);
+ return err;
+}
+
+/*
+ * Local exporting node end IO callbacks.
+ */
+static int kst_export_write_end_io(struct bio *bio, unsigned int size, int err)
+{
+ dprintk("%s: bio: %p, size: %u, idx: %d, num: %d, err: %d.\n",
+ __func__, bio, bio->bi_size, bio->bi_idx, bio->bi_vcnt, err);
+
+ if (bio->bi_size)
+ return 1;
+
+ kst_export_put_bio(bio);
+ return 0;
+}
+
+static int kst_export_read_end_io(struct bio *bio, unsigned int size, int err)
+{
+ struct dst_request *req = bio->bi_private;
+ struct kst_state *st = req->state;
+ int use_csum = test_bit(DST_NODE_USE_CSUM, &req->node->flags);
+
+ dprintk("%s: bio: %p, req: %p, size: %u, idx: %d, num: %d, err: %d.\n",
+ __func__, bio, req, bio->bi_size, bio->bi_idx,
+ bio->bi_vcnt, err);
+
+ if (bio->bi_size)
+ return 1;
+
+ if (err) {
+ kst_export_put_bio(bio);
+ return 0;
+ }
+
+ bio->bi_size = req->size = req->orig_size;
+ bio->bi_rw = WRITE;
+ bio->bi_end_io = kst_export_write_end_io;
+ if (use_csum)
+ req->flags &= ~(DST_REQ_HEADER_SENT | DST_REQ_CHEKSUM_RECV);
+
+ /*
+ * This is a race with kst_data_callback(), which checks
+ * this bit to determine if it can or can not process given
+ * request. This does not harm actually, since subsequent
+ * state wakeup will call it again and thus will pick
+ * given request in time.
+ */
+ req->flags &= ~DST_REQ_EXPORT_READ;
+ kst_wake(st);
+ return 0;
+}
+
+/*
+ * This callback is invoked each time new request from remote
+ * node to given local export node is received.
+ * It allocates new block IO request and queues it for processing.
+ */
+static int kst_export_ready(struct kst_state *st)
+{
+ struct dst_remote_request r;
+ struct bio *bio;
+ int err, nr, i;
+ struct dst_request *req;
+ unsigned int revents = st->socket->ops->poll(NULL, st->socket, NULL);
+
+ if (revents & (POLLERR | POLLHUP)) {
+ err = -EPIPE;
+ goto err_out_exit;
+ }
+
+ if (!(revents & POLLIN) || !list_empty(&st->request_list))
+ return 0;
+
+ err = dst_data_recv_header(st->socket, &r, 1);
+ if (err != sizeof(struct dst_remote_request)) {
+ err = -ECONNRESET;
+ goto err_out_exit;
+ }
+
+ kst_convert_header(&r);
+
+ dprintk("\n%s: st: %p, cmd: %u, sector: %llu, size: %u, "
+ "csum: %x, offset: %u.\n",
+ __func__, st, r.cmd, r.sector,
+ r.size, r.csum, r.offset);
+
+ err = -EINVAL;
+ if (r.cmd != DST_READ && r.cmd != DST_WRITE && r.cmd != DST_REMOTE_CFG)
+ goto err_out_exit;
+
+ if ((s64)(r.sector + to_sector(r.size)) < 0 ||
+ (r.sector + to_sector(r.size)) > st->node->size ||
+ r.offset >= PAGE_SIZE)
+ goto err_out_exit;
+
+ if (r.cmd == DST_REMOTE_CFG) {
+ r.sector = st->node->size;
+
+ if (test_bit(DST_NODE_USE_CSUM, &st->node->flags))
+ r.csum = 1;
+
+ kst_convert_header(&r);
+
+ err = dst_data_send_header(st->socket, &r);
+ if (err != sizeof(struct dst_remote_request)) {
+ err = -EINVAL;
+ goto err_out_exit;
+ }
+ kst_wake(st);
+ return 0;
+ }
+
+ nr = DIV_ROUND_UP(r.size, PAGE_SIZE);
+
+ while (r.size) {
+ int nr_pages = min(BIO_MAX_PAGES, nr);
+ unsigned int size;
+ struct page *page;
+
+ err = -ENOMEM;
+ req = dst_clone_request(NULL, st->node->w->req_pool);
+ if (!req)
+ goto err_out_exit;
+
+ bio = bio_alloc(GFP_NOIO, nr_pages);
+ if (!bio)
+ goto err_out_free_req;
+
+ req->flags = DST_REQ_EXPORT | DST_REQ_HEADER_SENT |
+ DST_REQ_CHEKSUM_RECV;
+ req->bio = bio;
+ req->state = st;
+ req->node = st->node;
+ req->callback = &kst_data_callback;
+ req->bio_endio = &kst_bio_endio;
+
+ req->tmp_offset = 0;
+ req->tmp_csum = r.csum;
+
+ /*
+ * Yes, looks a bit weird.
+ * Logic is simple - for local exporting node all operations
+ * are reversed compared to usual nodes, since usual nodes
+ * process remote data and local export node process remote
+ * requests, so that writing data means sending data to
+ * remote node and receiving on the local export one.
+ *
+ * So, to process writing to the exported node we need first
+ * to receive data from the net (i.e. to perform READ
+ * operationin terms of usual node), and then put it to the
+ * storage (WRITE command, so it will be changed before
+ * calling generic_make_request()).
+ *
+ * To process read request from the exported node we need
+ * first to read it from storage (READ command for BIO)
+ * and then send it over the net (perform WRITE operation
+ * in terms of network).
+ */
+ if (r.cmd == DST_WRITE) {
+ req->flags |= DST_REQ_EXPORT_WRITE;
+ bio->bi_end_io = kst_export_write_end_io;
+ } else {
+ req->flags |= DST_REQ_EXPORT_READ;
+ bio->bi_end_io = kst_export_read_end_io;
+ }
+ bio->bi_rw = READ;
+ bio->bi_private = req;
+ bio->bi_sector = r.sector;
+ bio->bi_bdev = st->node->bdev;
+
+ for (i = 0; i < nr_pages; ++i) {
+ page = alloc_page(GFP_NOIO);
+ if (!page)
+ break;
+
+ size = min_t(u32, PAGE_SIZE - r.offset, r.size);
+
+ err = bio_add_page(bio, page, size, 0);
+ dprintk("%s: %d/%d: page: %p, size: %u, "
+ "offset: %u (used zero), err: %d.\n",
+ __func__, i, nr_pages, page, size,
+ r.offset, err);
+ if (err <= 0)
+ break;
+
+ if (err == size)
+ nr--;
+
+ r.size -= err;
+ r.sector += to_sector(err);
+
+ if (!r.size)
+ break;
+ }
+
+ if (!bio->bi_vcnt) {
+ err = -ENOMEM;
+ goto err_out_put;
+ }
+
+ req->size = req->orig_size = bio->bi_size;
+ req->start = bio->bi_sector;
+ req->idx = 0;
+ req->num = bio->bi_vcnt;
+
+ dprintk("%s: submitting: bio: %p, req: %p, start: %llu, "
+ "size: %llu, idx: %d, num: %d, offset: %u, csum: %x.\n",
+ __func__, bio, req, req->start, req->size,
+ req->idx, req->num, req->offset, req->tmp_csum);
+
+ err = kst_enqueue_req(st, req);
+ if (err)
+ goto err_out_put;
+
+ if (r.cmd == DST_READ) {
+ generic_make_request(bio);
+ }
+ }
+
+ kst_wake(st);
+ return 0;
+
+err_out_put:
+ bio_put(bio);
+err_out_free_req:
+ dst_free_request(req);
+err_out_exit:
+ return err;
+}
+
+static void kst_export_exit(struct kst_state *st)
+{
+ struct dst_node *n = st->node;
+
+ kst_common_exit(st);
+ dst_node_put(n);
+}
+
+static struct kst_state_ops kst_data_export_ops = {
+ .init = &kst_data_init,
+ .push = &kst_data_push,
+ .exit = &kst_export_exit,
+ .ready = &kst_export_ready,
+};
+
+/*
+ * This callback is invoked each time listening socket for
+ * given local export node becomes ready.
+ * It creates new state for connected client and queues for processing.
+ */
+static int kst_listen_ready(struct kst_state *st)
+{
+ struct socket *newsock;
+ struct saddr addr;
+ struct kst_state *newst;
+ int err;
+ unsigned int revents, permissions = 0;
+ struct dst_secure *s;
+
+ revents = st->socket->ops->poll(NULL, st->socket, NULL);
+ if (!(revents & POLLIN))
+ return 1;
+
+ err = sock_create(st->socket->ops->family, st->socket->type,
+ st->socket->sk->sk_protocol, &newsock);
+ if (err)
+ goto err_out_exit;
+
+ err = st->socket->ops->accept(st->socket, newsock, 0);
+ if (err)
+ goto err_out_put;
+
+ if (newsock->ops->getname(newsock, (struct sockaddr *)&addr,
+ (int *)&addr.sa_data_len, 2) < 0) {
+ err = -ECONNABORTED;
+ goto err_out_put;
+ }
+
+ list_for_each_entry(s, &st->request_list, sec_entry) {
+ void *sec_addr, *new_addr;
+
+ sec_addr = ((void *)&s->sec.addr) + s->sec.check_offset;
+ new_addr = ((void *)&addr) + s->sec.check_offset;
+
+ if (!memcmp(sec_addr, new_addr,
+ addr.sa_data_len - s->sec.check_offset)) {
+ permissions = s->sec.permissions;
+ break;
+ }
+ }
+
+ /*
+ * So far only reading and writing are supported.
+ * Block device does not know about anything else,
+ * but as far as I recall, there was a prognosis,
+ * that computer will never require more than 640kb of RAM.
+ */
+ if (permissions == 0) {
+ err = -EPERM;
+ goto err_out_put;
+ }
+
+ if (st->socket->ops->family == AF_INET) {
+ struct sockaddr_in *sin = (struct sockaddr_in *)&addr;
+ printk(KERN_INFO "%s: Client: %u.%u.%u.%u:%d.\n", __func__,
+ NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
+ } else if (st->socket->ops->family == AF_INET6) {
+ struct sockaddr_in6 *sin = (struct sockaddr_in6 *)&addr;
+ printk(KERN_INFO "%s: Client: "
+ "%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d",
+ __func__,
+ NIP6(sin->sin6_addr), ntohs(sin->sin6_port));
+ }
+
+ dst_node_get(st->node);
+ newst = kst_state_init(st->node, permissions,
+ &kst_data_export_ops, newsock);
+ if (IS_ERR(newst)) {
+ err = PTR_ERR(newst);
+ goto err_out_put;
+ }
+
+ /*
+ * Negative return value means error, positive - stop this state
+ * processing. Zero allows to check state for pending requests.
+ * Listening socket contains security objects in request list,
+ * since it does not have any requests.
+ */
+ return 1;
+
+err_out_put:
+ sock_release(newsock);
+err_out_exit:
+ return 1;
+}
+
+static int kst_listen_init(struct kst_state *st, void *data)
+{
+ int err = -ENOMEM, i;
+ struct dst_le_template *tmp = data;
+ struct dst_secure *s;
+
+ for (i=0; i<tmp->le->secure_attr_num; ++i) {
+ s = kmalloc(sizeof(struct dst_secure), GFP_KERNEL);
+ if (!s)
+ goto err_out_exit;
+
+ memcpy(&s->sec, tmp->data, sizeof(struct dst_secure_user));
+
+ list_add_tail(&s->sec_entry, &st->request_list);
+ tmp->data += sizeof(struct dst_secure_user);
+
+ if (s->sec.addr.sa_family == AF_INET) {
+ struct sockaddr_in *sin =
+ (struct sockaddr_in *)&s->sec.addr;
+ printk(KERN_INFO "%s: Client: %u.%u.%u.%u:%d, "
+ "permissions: %x.\n",
+ __func__, NIPQUAD(sin->sin_addr.s_addr),
+ ntohs(sin->sin_port), s->sec.permissions);
+ } else if (s->sec.addr.sa_family == AF_INET6) {
+ struct sockaddr_in6 *sin =
+ (struct sockaddr_in6 *)&s->sec.addr;
+ printk(KERN_INFO "%s: Client: "
+ "%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d, "
+ "permissions: %x.\n",
+ __func__, NIP6(sin->sin6_addr),
+ ntohs(sin->sin6_port), s->sec.permissions);
+ }
+ }
+
+ err = kst_sock_create(st, &tmp->le->rctl.addr, tmp->le->rctl.type,
+ tmp->le->rctl.proto, tmp->le->backlog);
+ if (err)
+ goto err_out_exit;
+
+ err = kst_poll_init(st);
+ if (err)
+ goto err_out_release;
+
+ return 0;
+
+err_out_release:
+ kst_sock_release(st);
+err_out_exit:
+ kst_listen_flush(st);
+ return err;
+}
+
+/*
+ * Operations for different types of states.
+ * There are three:
+ * data state - created for remote node, when distributed storage connects
+ * to remote node, which contain data.
+ * listen state - created for local export node, when remote distributed
+ * storage's node connects to given node to get/put data.
+ * data export state - created for each client connected to above listen
+ * state.
+ */
+static struct kst_state_ops kst_listen_ops = {
+ .init = &kst_listen_init,
+ .exit = &kst_listen_exit,
+ .ready = &kst_listen_ready,
+};
+static struct kst_state_ops kst_data_ops = {
+ .init = &kst_data_init,
+ .push = &kst_data_push,
+ .exit = &kst_common_exit,
+ .recovery = &kst_data_recovery,
+};
+
+struct kst_state *kst_listener_state_init(struct dst_node *node,
+ struct dst_le_template *tmp)
+{
+ return kst_state_init(node, DST_PERM_READ | DST_PERM_WRITE,
+ &kst_listen_ops, tmp);
+}
+
+struct kst_state *kst_data_state_init(struct dst_node *node,
+ struct socket *newsock)
+{
+ return kst_state_init(node, DST_PERM_READ | DST_PERM_WRITE,
+ &kst_data_ops, newsock);
+}
+
+/*
+ * Remove all workers and associated states.
+ */
+void kst_exit_all(void)
+{
+ struct kst_worker *w, *n;
+
+ list_for_each_entry_safe(w, n, &kst_worker_list, entry) {
+ kst_worker_exit(w);
+ }
+}

2007-12-17 15:05:41

by Evgeniy Polyakov

[permalink] [raw]
Subject: [1/4] DST: Distributed storage documentation.


Distributed storage documentation.

Algorithms used in the system, userspace interfaces
(sysfs dirs and files), design and implementation details
are described here.

Signed-off-by: Evgeniy Polyakov <[email protected]>


diff --git a/Documentation/dst/algorithms.txt b/Documentation/dst/algorithms.txt
new file mode 100644
index 0000000..1437a6a
--- /dev/null
+++ b/Documentation/dst/algorithms.txt
@@ -0,0 +1,115 @@
+Each storage by itself is just a set of contiguous logical blocks, with
+allowed number of operations. Nodes, each of which has own start and size,
+are placed into storage by appropriate algorithm, which remaps
+logical sector number into real node's sector. One can create
+own algorithms, since DST has pluggable interface for that.
+Currently mirrored and linear algorithms are supported.
+
+Let's briefly describe how they work.
+
+Linear algorithm.
+Simple approach of concatenating storages into single device with
+increased size is used in this algorithm. Essentially new device
+has size equal to sum of sizes of underlying nodes and nodes are
+placed one after another.
+
+ /----- Node 1 ---\ /------ Node 3 ----\
+start end start end
+ |==================|========================|==================|
+ | start end |
+ | \------- Node 2 ---------/ |
+ | |
+start end
+ \-------------------------- DST storage ----------------------/
+
+ /\
+ ||
+ ||
+
+ IO operations
+
+ Figure 1.
+ 3 nodes combined into single storage using linear algorithm.
+
+Mirror algorithm.
+In this algorithms nodes are placed under each other, so when
+operation comes to the first one, it can be mirrored to all
+underlying nodes. In case of reading, actual data is obtained from
+the nearest node - algoritm keeps track of previous operation
+and knows where it was stopped, so that subsequent seek to the
+start of the new request will take the shortest time.
+Writing is always mirrored to all underlying nodes.
+
+ IO operations
+ ||
+ ||
+ \/
+
+|---------------- DST storage -------------------|
+| prev position |
+|-------|------------ Node 1 --------------------|
+| prev pos |
+|-------------------- Node 2 -----|--------------|
+|prev pos |
+|---|---------------- Node 3 --------------------|
+
+ Figure 2.
+ 3 nodes combined into single storage using mirror algorithm.
+
+Each algorithm must implement number of callbacks,
+which must be registered during initialization time.
+
+struct dst_alg_ops
+{
+ int (*add_node)(struct dst_node *n);
+ void (*del_node)(struct dst_node *n);
+ int (*remap)(struct dst_request *req);
+ int (*error)(struct kst_state *state, int err);
+ struct module *owner;
+};
+
+@add_node.
+This callback is invoked when new node is being added into the storage,
+but before node is actually added into the storage, so that it could
+be accessed from it. When it is called, all appropriate initialization
+of the underlying device is already completed (system has been connected
+to remote node or got a reference to the local block device). At this
+stage algorithm can add node into private map.
+It must return zero on success or negative value otherwise.
+
+@del_node.
+This callback is invoked when node is being deleted from the storage,
+i.e. when its reference counter hits zero. It is called before
+any cleaning is performed.
+It must return zero on success or negative value otherwise.
+
+@remap.
+This callback is invoked each time new bio hits the storage.
+Request structure contains BIO itself, pointer to the node, which originally
+stores the whole region under given IO request, and various parameters
+used by storage core to process this block request.
+It must return zero on success or negative value otherwise. It is upto
+this method to call all cleaning if remapping failed, for example it must
+call kst_bio_endio() for given callback in case of error, which in turn
+will call bio_endio(). Note, that dst_request structure provided in this
+callback is allocated on stack, so if there is a need to use it outside
+of the given function, it must be cloned (it will happen automatically
+in state's push callback, but that copy will not be shared by any other
+user).
+
+@error.
+This callback is invoked for each error, which happend when processed
+requests for remote nodes or when talking to remote size
+of the local export node (state contains data related to data
+transfers over the network).
+If this function has fixed given error, it must return 0 or negative
+error value otherwise.
+
+@owner.
+This is module reference counter updated automatically by DST core.
+
+Algorithm must provide its name and above structure to the
+dst_alloc_alg() function, which will return a reference to the newly
+created algorithm.
+To remove it, one needs to call dst_remove_alg() with given algorithm
+pointer.
diff --git a/Documentation/dst/dst.txt b/Documentation/dst/dst.txt
new file mode 100644
index 0000000..a6ea126
--- /dev/null
+++ b/Documentation/dst/dst.txt
@@ -0,0 +1,69 @@
+Distributed storage. Design and implementation.
+http://tservice.net.ru/~s0mbre/old/?section=projects&item=dst
+
+ Evgeniy Polyakov
+
+This document is intended to briefly describe design and
+implementation details of the distributed storage project,
+aimed to create ability to group physically and/or logically
+distributed storages into single device.
+
+Main operational unit in the storage is node. Node can represent
+either remote storage, connected to local machine, or local
+device, or storage exported to the outside of the system.
+Here goes small explaination of basic therms.
+
+Local node.
+This node is just a logical link between block device (with given
+major and minor numbers) and structure in the DST hierarchy,
+which represents number of sectors on the area, corresponding to given
+block device. it can be a disk, a device mapper node or stacked
+block device on top of another underlying DST nodes.
+
+Local export node.
+Essentially the same as local node, but it allows to access
+to its data via network. Remote clients can connect to given local
+export node and read or write blocks according to its size.
+Blocks are then forwarded to underlying local node and processed
+there accordingly to the nature of the local node.
+
+Remote node.
+This type of nodes contain remotely accessible devices. One can think
+about remote nodes as remote disks, which can be connected to
+local system and combined into single storage. Remote nodes
+are presented as number of sectors accessed over the network
+by the local machine, where distributed storage is being formed.
+Remote node allows autoconfiguration - size of the storage and
+checksumming will be requested during node initialization (if remote
+node supports checksumming it will be turned on).
+
+
+Each node or set of them can be formed into single array, which
+in turn becomes a local node, which can be exported further by stacking
+a local export node on top of it.
+
+Each storage by itself is just a set of contiguous logical blocks, with
+allowed number of operations. Nodes, each of which has own start and size,
+are placed into storage by appropriate algorithm, which remaps
+logical sector number into real node's sector. One can create
+own algorithms, since DST has pluggable interface for that.
+Currently mirrored and linear algorithms are supported.
+One can find more details in Documentation/dst/algorithms.txt file.
+
+Main goal of the distributed storage is to combine remote nodes into
+single device, so each block IO request is being sent over the network
+(contrary requests for local nodes are handled by the gneric block
+layer features). Each network connection has number of variables which
+describe it (socket, list of requests, error handling and so on),
+which form kst_state structure. This network state is added into per-socket
+polling state machine, and can be processed by dedicated thread when
+becomes ready. This system forms asynchronous IO for given block
+requests. If block request can be processed without blocking, then
+no new structures are allocated and async part of the state is not used.
+
+When connection to the remote peer breaks, DST core tries to reconnect
+to failed node and no requests are marked as errorneous, instead
+they live in the queue until reconnectin is established.
+
+Userspace code, setup documentation and examples can be found on project's
+homepage above.
diff --git a/Documentation/dst/sysfs.txt b/Documentation/dst/sysfs.txt
new file mode 100644
index 0000000..63e5c00
--- /dev/null
+++ b/Documentation/dst/sysfs.txt
@@ -0,0 +1,33 @@
+This file describes sysfs files created for each storage.
+
+1. Per-storage files.
+Each storage has its own dir /sysfs/devices/$storage_name,
+which contains following files:
+
+alg - contains name of the algorithm used to created given storage
+name - name of the storage
+nodes - map of the storage (list of nodes and their sizes and starts)
+remove_all_nodes - writable file which allows to remove all nodes from given
+ storage
+n-$start-$cookie - per node directory, where
+ $start - start of the given node in sectors,
+ $cookie - unique node's id used by DST
+
+2. Per-node files.
+Node's files are located in /sysfs/devices/$storage_name/n-$start-$cookie
+directory, described above.
+
+clean - writable file, writing leads to marking node as clean (in sync)
+dirty - writable file, writing leads to marking node as dirty (not in sync)
+resync_size - size of the resync sliding window
+resync_timout - resync requests checking timeout (in milliseconds) of
+ the background worker thread
+state - sync/notsync state of the node in the mirror
+
+flags - flags of the given node
+size - size of the given node in sectors
+start - start of the given node in the storage in sectors
+type - contains type of the node in the following format: $type: $dev
+ where $type is either 'L' or 'R' - local or remote acordingly,
+ and $dev is device name for local node (/dev/sda1 for example)
+ or address of the remote node (192.168.4.81:1025 for example)

2007-12-17 15:06:00

by Evgeniy Polyakov

[permalink] [raw]
Subject: [4/4] DST: Algorithms used in distributed storage.


Algorithms used in distributed storage.
Mirror and linear mapping code.

Signed-off-by: Evgeniy Polyakov <[email protected]>


diff --git a/drivers/block/dst/alg_linear.c b/drivers/block/dst/alg_linear.c
new file mode 100644
index 0000000..836764d
--- /dev/null
+++ b/drivers/block/dst/alg_linear.c
@@ -0,0 +1,114 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/dst.h>
+
+static struct dst_alg *alg_linear;
+
+/*
+ * This callback is invoked when node is removed from storage.
+ */
+static void dst_linear_del_node(struct dst_node *n)
+{
+}
+
+/*
+ * This callback is invoked when node is added to storage.
+ */
+static int dst_linear_add_node(struct dst_node *n)
+{
+ struct dst_storage *st = n->st;
+ struct block_device *bdev;
+
+ dprintk("%s: disk_size: %llu, node_size: %llu.\n",
+ __func__, st->disk_size, n->size);
+
+ mutex_lock(&st->tree_lock);
+ n->start = st->disk_size;
+ st->disk_size += n->size;
+ set_capacity(st->disk, st->disk_size);
+
+ bdev = bdget_disk(st->disk, 0);
+ if (bdev) {
+ mutex_lock(&bdev->bd_inode->i_mutex);
+ i_size_write(bdev->bd_inode, to_bytes(st->disk_size));
+ mutex_unlock(&bdev->bd_inode->i_mutex);
+ bdput(bdev);
+ }
+ mutex_unlock(&st->tree_lock);
+
+ return 0;
+}
+
+static int dst_linear_remap(struct dst_request *req)
+{
+ int err;
+
+ if (req->node->bdev) {
+ generic_make_request(req->bio);
+ return 0;
+ }
+
+ err = kst_check_permissions(req->state, req->bio);
+ if (err)
+ return err;
+
+ return req->state->ops->push(req);
+}
+
+/*
+ * Failover callback - it is invoked each time error happens during
+ * request processing.
+ */
+static int dst_linear_error(struct kst_state *st, int err)
+{
+ if (err)
+ set_bit(DST_NODE_FROZEN, &st->node->flags);
+ else
+ clear_bit(DST_NODE_FROZEN, &st->node->flags);
+ return 0;
+}
+
+static struct dst_alg_ops alg_linear_ops = {
+ .remap = dst_linear_remap,
+ .add_node = dst_linear_add_node,
+ .del_node = dst_linear_del_node,
+ .error = dst_linear_error,
+ .owner = THIS_MODULE,
+};
+
+static int __devinit alg_linear_init(void)
+{
+ alg_linear = dst_alloc_alg("alg_linear", &alg_linear_ops);
+ if (!alg_linear)
+ return -ENOMEM;
+
+ return 0;
+}
+
+static void __devexit alg_linear_exit(void)
+{
+ dst_remove_alg(alg_linear);
+}
+
+module_init(alg_linear_init);
+module_exit(alg_linear_exit);
+
+MODULE_LICENSE("GPL");
+MODULE_AUTHOR("Evgeniy Polyakov <[email protected]>");
+MODULE_DESCRIPTION("Linear distributed algorithm.");
diff --git a/drivers/block/dst/alg_mirror.c b/drivers/block/dst/alg_mirror.c
new file mode 100644
index 0000000..c10d582
--- /dev/null
+++ b/drivers/block/dst/alg_mirror.c
@@ -0,0 +1,1536 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/poll.h>
+#include <linux/dst.h>
+#include <linux/vmstat.h>
+
+struct dst_write_entry
+{
+ int error;
+ u32 size;
+ u64 start;
+};
+#define DST_LOG_ENTRIES_PER_PAGE (PAGE_SIZE/sizeof(struct dst_write_entry))
+
+struct dst_mirror_node_data
+{
+ u64 age;
+ u64 num, write_idx, resync_idx;
+};
+
+struct dst_mirror_log
+{
+ unsigned int nr_pages;
+ struct dst_write_entry **entries;
+};
+
+struct dst_mirror_priv
+{
+ u64 resync_start, resync_size;
+ atomic_t resync_num;
+ struct completion resync_complete;
+ struct delayed_work resync_work;
+ unsigned int resync_timeout;
+
+ u64 last_start;
+
+ spinlock_t resync_wait_lock;
+ struct list_head resync_wait_list;
+ int resync_wait_num;
+ int full_resync;
+
+ spinlock_t backlog_lock;
+ struct list_head backlog_list;
+
+ struct dst_node *node;
+
+ u64 old_age, ndp_sector;
+ struct dst_mirror_node_data data;
+
+ spinlock_t log_lock;
+ struct dst_mirror_log log;
+};
+
+struct dst_mirror_sync_container
+{
+ struct list_head sync_entry;
+ u64 start, size;
+ struct dst_node *node;
+ struct bio *bio;
+};
+
+static struct dst_alg *alg_mirror;
+static struct bio_set *dst_mirror_bio_set;
+
+static int dst_mirror_resync(struct dst_node *n, int ndp);
+
+static int dst_mirror_mark_notsync(struct dst_node *n)
+{
+ if (!test_and_set_bit(DST_NODE_NOTSYNC, &n->flags)) {
+ struct dst_mirror_priv *priv = n->priv;
+ printk(KERN_NOTICE "%s: not synced node n: %p.\n", __func__, n);
+
+ priv->data.resync_idx = priv->data.write_idx;
+ return 1;
+ }
+
+ return 0;
+}
+
+static void dst_mirror_mark_node_sync(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+
+ if (test_and_clear_bit(DST_NODE_NOTSYNC, &n->flags))
+ printk(KERN_NOTICE "%s: node: %p, %llu:%llu synchronization "
+ "has been completed.\n",
+ __func__, n, n->start, n->size);
+
+ priv->full_resync = 0;
+ complete(&priv->resync_complete);
+}
+
+static ssize_t dst_mirror_mark_dirty(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+
+ dst_mirror_mark_notsync(n);
+ return count;
+}
+
+static ssize_t dst_mirror_mark_clean(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+
+ dst_mirror_mark_node_sync(n);
+ return count;
+}
+
+static ssize_t dst_mirror_show_state(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+
+ return sprintf(buf, "%s\n", test_bit(DST_NODE_NOTSYNC, &n->flags) ? "notsync" : "sync");
+}
+
+static ssize_t dst_mirror_show_resync_timeout(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct dst_mirror_priv *priv = n->priv;
+
+ return sprintf(buf, "%u\n", priv->resync_timeout);
+}
+
+static ssize_t dst_mirror_show_resync_size(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct dst_mirror_priv *priv = n->priv;
+
+ return sprintf(buf, "%llu\n", priv->resync_size);
+}
+
+static ssize_t dst_mirror_set_resync_size(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct dst_mirror_priv *priv = n->priv;
+ unsigned long size;
+
+ size = simple_strtoul(buf, NULL, 0);
+
+ if (size > n->st->disk_size)
+ return -E2BIG;
+
+ priv->resync_size = size;
+
+ return count;
+}
+
+static ssize_t dst_mirror_show_log_num(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct dst_mirror_priv *priv = n->priv;
+
+ return sprintf(buf, "%llu\n", priv->data.num);
+}
+
+static ssize_t dst_mirror_set_resync_timeout(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct dst_mirror_priv *priv = n->priv;
+ unsigned long tm;
+
+ tm = simple_strtoul(buf, NULL, 0);
+
+ if (tm < 100 || tm > 30000)
+ return -EINVAL;
+
+ priv->resync_timeout = (unsigned int)tm;
+
+ return count;
+}
+
+static struct device_attribute dst_mirror_attrs[] = {
+ __ATTR(dirty, S_IWUSR, NULL, dst_mirror_mark_dirty),
+ __ATTR(clean, S_IWUSR, NULL, dst_mirror_mark_clean),
+ __ATTR(resync_size, S_IWUSR | S_IRUGO, dst_mirror_show_resync_size,
+ dst_mirror_set_resync_size),
+ __ATTR(resync_timeout, S_IWUSR | S_IRUGO, dst_mirror_show_resync_timeout,
+ dst_mirror_set_resync_timeout),
+ __ATTR(state, S_IRUSR, dst_mirror_show_state, NULL),
+ __ATTR(log_num, S_IRUSR, dst_mirror_show_log_num, NULL),
+};
+
+static void dst_mirror_handle_priv(struct dst_node *n)
+{
+ if (n->priv) {
+ int err, i;
+
+ for (i=0; i<ARRAY_SIZE(dst_mirror_attrs); ++i)
+ err = device_create_file(&n->device,
+ &dst_mirror_attrs[i]);
+ }
+}
+
+static void dst_mirror_destructor(struct bio *bio)
+{
+ dprintk("%s: bio: %p.\n", __func__, bio);
+ bio_free(bio, dst_mirror_bio_set);
+}
+
+struct dst_mirror_ndp
+{
+ int err;
+ struct page *page;
+ struct completion complete;
+};
+
+static void dst_mirror_ndb_complete(struct dst_mirror_ndp *cmp, int err)
+{
+ cmp->err = err;
+ dprintk("%s: completing request: cmp: %p, err: %d.\n",
+ __func__, cmp, err);
+ complete(&cmp->complete);
+}
+
+static void dst_mirror_ndp_bio_endio(struct dst_request *req, int err)
+{
+ struct dst_mirror_ndp *cmp = req->bio->bi_private;
+
+ dst_mirror_ndb_complete(cmp, err);
+}
+
+static int dst_mirror_ndp_end_io(struct bio *bio, unsigned int size, int err)
+{
+ struct dst_mirror_ndp *cmp = bio->bi_private;
+
+ if (bio->bi_size)
+ return 0;
+
+ dst_mirror_ndb_complete(cmp, err);
+ return 0;
+}
+
+/*
+ * This function reads or writes node's private data from underlying media.
+ */
+static int dst_mirror_process_node_data(struct dst_node *n,
+ struct dst_mirror_node_data *ndata, int op)
+{
+ struct bio *bio;
+ int err = -ENOMEM;
+ struct dst_mirror_ndp *cmp;
+ struct dst_mirror_priv *priv = n->priv;
+ void *addr;
+
+ cmp = kzalloc(sizeof(struct dst_mirror_ndp), GFP_KERNEL);
+ if (!cmp)
+ goto err_out_exit;
+
+ cmp->page = alloc_page(GFP_NOIO);
+ if (!cmp->page)
+ goto err_out_free_cmp;
+
+ addr = kmap(cmp->page);
+
+ init_completion(&cmp->complete);
+
+ if (op == WRITE)
+ memcpy(addr, ndata, sizeof(struct dst_mirror_node_data));
+
+ bio = bio_alloc_bioset(GFP_NOIO, 1, dst_mirror_bio_set);
+ if (!bio)
+ goto err_out_free_page;
+
+ bio->bi_rw = op;
+ bio->bi_private = cmp;
+ bio->bi_sector = priv->ndp_sector;
+ bio->bi_bdev = n->bdev;
+ bio->bi_destructor = dst_mirror_destructor;
+ bio->bi_end_io = dst_mirror_ndp_end_io;
+
+ err = bio_add_pc_page(n->st->queue, bio, cmp->page, 512, 0);
+ if (err <= 0)
+ goto err_out_free_bio;
+
+ if (n->bdev) {
+ generic_make_request(bio);
+ } else {
+ struct dst_request req;
+
+ memset(&req, 0, sizeof(struct dst_request));
+ dst_fill_request(&req, bio, bio->bi_sector, n,
+ &dst_mirror_ndp_bio_endio);
+
+ err = req.state->ops->push(&req);
+ if (err)
+ req.bio_endio(&req, err);
+ }
+
+ dprintk("%s: waiting for completion: bio: %p, cmp: %p.\n",
+ __func__, bio, cmp);
+
+ wait_for_completion(&cmp->complete);
+
+ err = cmp->err;
+ if (!err && (op != WRITE))
+ memcpy(ndata, addr, sizeof(struct dst_mirror_node_data));
+
+ kunmap(cmp->page);
+
+ dprintk("%s: freeing bio: %p, err: %d.\n", __func__, bio, err);
+
+err_out_free_bio:
+ bio_put(bio);
+err_out_free_page:
+ kunmap(cmp->page);
+ __free_page(cmp->page);
+err_out_free_cmp:
+ kfree(cmp);
+err_out_exit:
+ return err;
+}
+
+/*
+ * This function reads node's private data from underlying media.
+ */
+static int dst_mirror_read_node_data(struct dst_node *n,
+ struct dst_mirror_node_data *ndata)
+{
+ return dst_mirror_process_node_data(n, ndata, READ);
+}
+
+/*
+ * This function writes node's private data from underlying media.
+ */
+static int dst_mirror_write_node_data(struct dst_node *n,
+ struct dst_mirror_node_data *ndata)
+{
+ dprintk("%s: writing new age: %llx, node: %p %llu-%llu.\n",
+ __func__, ndata->age, n, n->start, n->size);
+ return dst_mirror_process_node_data(n, ndata, WRITE);
+}
+
+static int dst_mirror_process_log_on_disk(struct dst_node *n, int op)
+{
+ struct dst_mirror_priv *priv = n->priv;
+ struct dst_mirror_log *log = &priv->log;
+ int err = -ENOMEM;
+ unsigned int i;
+ struct bio *bio;
+ struct dst_mirror_ndp *cmp;
+
+ cmp = kzalloc(sizeof(struct dst_mirror_ndp), GFP_KERNEL);
+ if (!cmp)
+ goto err_out_exit;
+
+ bio = bio_alloc_bioset(GFP_NOIO, 1, dst_mirror_bio_set);
+ if (!bio)
+ goto err_out_free_cmp;
+
+ bio->bi_rw = op;
+ bio->bi_private = cmp;
+ bio->bi_bdev = n->bdev;
+ bio->bi_destructor = dst_mirror_destructor;
+ bio->bi_end_io = dst_mirror_ndp_end_io;
+
+ for (i=0; i<log->nr_pages; ++i) {
+ bio->bi_sector = n->size + to_sector(i*PAGE_SIZE);
+
+ err = bio_add_pc_page(n->st->queue, bio,
+ virt_to_page(log->entries[i]), PAGE_SIZE,
+ offset_in_page(log->entries[i]));
+ if (err <= 0)
+ goto err_out_free_bio;
+
+ init_completion(&cmp->complete);
+
+ if (n->bdev) {
+ generic_make_request(bio);
+ } else {
+ struct dst_request req;
+
+ memset(&req, 0, sizeof(struct dst_request));
+ dst_fill_request(&req, bio, bio->bi_sector, n,
+ &dst_mirror_ndp_bio_endio);
+
+ err = req.state->ops->push(&req);
+ if (err)
+ req.bio_endio(&req, err);
+ }
+
+ dprintk("%s: waiting for completion: bio: %p, cmp: %p.\n",
+ __func__, bio, cmp);
+
+ wait_for_completion(&cmp->complete);
+ if (cmp->err) {
+ err = cmp->err;
+ goto err_out_free_bio;
+ }
+ }
+
+ err = dst_mirror_write_node_data(n, &priv->data);
+ if (err)
+ goto err_out_free_cmp;
+
+err_out_free_bio:
+ bio_put(bio);
+err_out_free_cmp:
+ kfree(cmp);
+err_out_exit:
+ if (err)
+ dst_mirror_mark_notsync(n);
+ return err;
+}
+
+static int dst_mirror_ndp_setup(struct dst_node *n, int first_node, int clean_on_sync)
+{
+ struct dst_mirror_priv *p = n->priv;
+ int sync = 1, err;
+ struct dst_mirror_priv *fp = p;
+ struct dst_node *first;
+
+ p->full_resync = 0;
+
+ if (first_node) {
+ u64 new_age = *(u64 *)&n->st;
+
+ p->old_age = p->data.age;
+ printk(KERN_NOTICE "%s: first age: %llx -> %llx. "
+ "Old will be set to new for the first node.\n",
+ __func__, p->old_age, new_age);
+ p->data.age = new_age;
+
+ err = dst_mirror_write_node_data(n, &p->data);
+ if (err)
+ return err;
+ } else {
+ mutex_lock(&n->st->tree_lock);
+ first = dst_storage_tree_search(n->st, n->start);
+ if (!first) {
+ mutex_unlock(&n->st->tree_lock);
+ dprintk("%s: there are no nodes in the storage.\n", __func__);
+ return -ENODEV;
+ }
+
+ fp = first->priv;
+
+ if (fp->old_age != p->data.age) {
+ p->full_resync = 1;
+ sync = 0;
+ }
+
+ p->old_age = fp->old_age;
+
+ n->shared_head = first;
+ atomic_inc(&first->shared_num);
+ list_add_tail(&n->shared, &first->shared);
+ mutex_unlock(&n->st->tree_lock);
+
+ if (sync) {
+ unsigned long flags;
+ unsigned int pidx, pnum;
+
+ err = dst_mirror_process_log_on_disk(n, READ);
+ if (err)
+ goto err_out_put;
+
+ spin_lock_irqsave(&fp->log_lock, flags);
+ if (fp->data.write_idx != p->data.write_idx)
+ sync = 0;
+ spin_unlock_irqrestore(&fp->log_lock, flags);
+
+ pnum = p->data.resync_idx / DST_LOG_ENTRIES_PER_PAGE;
+ pidx = p->data.resync_idx % DST_LOG_ENTRIES_PER_PAGE;
+
+ if (p->log.entries[pnum][pidx].error)
+ sync = 0;
+ }
+ }
+
+ if (!sync) {
+ printk(KERN_NOTICE "%s: node %llu:%llu is not synced with the first one: "
+ "first_age: %llx, new_age: %llx, indexes %s.\n",
+ __func__, n->start, n->start+n->size,
+ p->data.age, p->old_age,
+ p->full_resync ? "does not match" : "match");
+ dst_mirror_mark_notsync(n);
+ } else {
+ if (clean_on_sync)
+ dst_mirror_mark_node_sync(n);
+ complete(&p->resync_complete);
+
+ printk(KERN_NOTICE "%s: node %llu:%llu is in sync with the first node.\n",
+ __func__, n->start, n->start+n->size);
+ }
+
+ dprintk("%s: age: old: %llx, new: %llx.\n", __func__, p->old_age, fp->data.age);
+
+ return 0;
+
+err_out_put:
+ first = n->shared_head;
+ atomic_dec(&first->shared_num);
+ mutex_lock(&n->st->tree_lock);
+ list_del(&n->shared);
+ n->shared_head = NULL;
+ mutex_unlock(&n->st->tree_lock);
+ dst_node_put(first);
+
+ return err;
+}
+
+static int dst_mirror_end_io(struct bio *bio, unsigned int size, int err)
+{
+ struct dst_request *req = bio->bi_private;
+
+ if (bio->bi_size)
+ return 0;
+
+ dprintk("%s: req: %p, bio: %p, req->bio: %p, err: %d.\n",
+ __func__, req, bio, req->bio, err);
+ req->bio_endio(req, err);
+ bio_put(bio);
+ return 0;
+}
+
+static int dst_mirror_process_request_nosync(struct dst_request *req,
+ struct dst_node *n)
+{
+ int err = 0;
+
+ /*
+ * Block layer requires to clone a bio.
+ */
+ if (n->bdev) {
+ struct bio *clone = bio_alloc_bioset(GFP_NOIO,
+ req->bio->bi_max_vecs, dst_mirror_bio_set);
+
+ __bio_clone(clone, req->bio);
+
+ clone->bi_bdev = n->bdev;
+ clone->bi_destructor = dst_mirror_destructor;
+ clone->bi_private = req;
+ clone->bi_end_io = &dst_mirror_end_io;
+
+ dprintk("%s: clone: %p, bio: %p, req: %p.\n",
+ __func__, clone, req->bio, req);
+
+ generic_make_request(clone);
+ err = 1;
+ } else {
+ struct dst_request nr;
+ /*
+ * Network state processing engine will clone request
+ * by itself if needed. We can not use the same structure
+ * here, since number of its fields will be modified.
+ */
+ memcpy(&nr, req, sizeof(struct dst_request));
+
+ nr.node = n;
+ nr.state = n->state;
+ nr.priv = req;
+
+ err = kst_check_permissions(n->state, req->bio);
+ if (!err)
+ err = n->state->ops->push(&nr);
+ }
+
+ dprintk("%s: req: %p, n: %p, bdev: %p, err: %d.\n",
+ __func__, req, n, n->bdev, err);
+
+ return err;
+}
+
+static int dst_mirror_sync_requeue(struct dst_node *n, int exiting)
+{
+ struct dst_mirror_priv *p = n->priv;
+ struct dst_mirror_sync_container *sc;
+ struct dst_request req;
+ unsigned long flags;
+ int err, num = 0;
+
+ if (!list_empty(&p->backlog_list))
+ dprintk("%s: n: %p, backlog_empty: %d, resync_num: %d.\n",
+ __func__, n, list_empty(&p->backlog_list),
+ atomic_read(&p->resync_num));
+
+ while (!list_empty(&p->backlog_list)) {
+ sc = NULL;
+ spin_lock_irqsave(&p->backlog_lock, flags);
+ if (!list_empty(&p->backlog_list)) {
+ sc = list_entry(p->backlog_list.next,
+ struct dst_mirror_sync_container,
+ sync_entry);
+ if (bio_rw(sc->bio) == WRITE)
+ list_del(&sc->sync_entry);
+ else
+ sc = NULL;
+ }
+ spin_unlock_irqrestore(&p->backlog_lock, flags);
+
+ if (!sc)
+ break;
+
+ sc->bio->bi_private = n;
+ if (sc->bio->bi_size == 0 || exiting) {
+ err = -EIO;
+ goto out;
+ }
+
+ memset(&req, 0, sizeof(struct dst_request));
+ dst_fill_request(&req, sc->bio, sc->start - n->start,
+ n, &kst_bio_endio);
+
+ err = dst_mirror_process_request_nosync(&req, n);
+out:
+ if (err < 0)
+ bio_endio(sc->bio, sc->bio->bi_size, err);
+ kfree(sc);
+ num++;
+ }
+
+ return num;
+}
+
+
+static void dst_mirror_resync_work(struct work_struct *work)
+{
+ struct dst_mirror_priv *priv = container_of(work,
+ struct dst_mirror_priv, resync_work.work);
+ struct dst_node *n = priv->node;
+
+ dst_mirror_resync(n, 0);
+ dst_mirror_sync_requeue(n, 0);
+ schedule_delayed_work(&priv->resync_work, priv->resync_timeout);
+}
+
+/*
+ * Mirroring log is used to store write request information.
+ * It is allocated on disk and in memory (sync happens each time
+ * resync work queue fires), and eats about 1% of free RAM or disk
+ * (what is less). Each write updates log, so when node goes offline,
+ * its log will be updated with error values, so that this entries
+ * could be resynced when node will be back online. When number of
+ * failed writes becomes equal to number of entries in the write log,
+ * recovery becomes impossible (since old log entries were overwritten)
+ * and full resync is scheduled.
+ *
+ * This does not work well with the situation, when there are multiple
+ * writes to the same locations - they are considered as different
+ * writes and thus will be resynced multiple times.
+ * The right solution is to check log for each write, better if log
+ * would be not array, but tree.
+ */
+static int dst_mirror_log_init(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+ struct dst_mirror_log *log = &priv->log;
+ struct dst_mirror_node_data *pd = &priv->data;
+ u64 allowed_ram = DIV_ROUND_UP(global_page_state(NR_FREE_PAGES), 100);
+ u64 allowed_disk = DIV_ROUND_UP(to_bytes(n->size), 100);
+ u64 num;
+ unsigned int nr_pages, i;
+ int err;
+
+ err = dst_mirror_read_node_data(n, pd);
+ if (err)
+ return err;
+
+ allowed_ram <<= PAGE_SHIFT;
+
+ num = min(allowed_disk, allowed_ram)/sizeof(struct dst_write_entry);
+ nr_pages = min(allowed_disk, allowed_ram) >> PAGE_SHIFT;
+
+ log->entries = kzalloc(nr_pages * sizeof(void *), GFP_KERNEL);
+ if (!log->entries)
+ return -ENOMEM;
+
+ for (i=0; i<nr_pages; ++i) {
+ log->entries[i] = kzalloc(PAGE_SIZE, GFP_KERNEL);
+ if (!log->entries[i])
+ goto err_out_free;
+ }
+
+ log->nr_pages = nr_pages;
+
+ pd->num = num;
+ pd->write_idx = pd->resync_idx = 0;
+
+ printk(KERN_INFO "%s: mirror write log contains %llu entries (%u pages).\n",
+ __func__, num, nr_pages);
+
+ return 0;
+
+err_out_free:
+ while (i-- != 0)
+ kfree(log->entries[i]);
+ kfree(log->entries);
+
+ return -ENOMEM;
+}
+
+static void dst_mirror_log_exit(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+ unsigned int i;
+
+ for (i=0; i<priv->log.nr_pages; ++i)
+ kfree(priv->log.entries[i]);
+ kfree(priv->log.entries);
+}
+
+/*
+ * This callback is invoked when node is added to storage.
+ */
+static int dst_mirror_add_node(struct dst_node *n)
+{
+ struct dst_storage *st = n->st;
+ struct dst_mirror_priv *priv;
+ int err = -ENOMEM, first_node = 0;
+ u64 disk_size;
+
+ n->size--; /* A sector size actually. */
+
+ priv = kzalloc(sizeof(struct dst_mirror_priv), GFP_KERNEL);
+ if (!priv)
+ return -ENOMEM;
+
+ priv->ndp_sector = n->size;
+ priv->node = n;
+ priv->resync_start = 0;
+ priv->resync_size = to_sector(1024*1024*100ULL);
+ init_completion(&priv->resync_complete);
+ atomic_set(&priv->resync_num, 0);
+ INIT_DELAYED_WORK(&priv->resync_work, dst_mirror_resync_work);
+ priv->resync_timeout = 1000;
+
+ spin_lock_init(&priv->resync_wait_lock);
+ INIT_LIST_HEAD(&priv->resync_wait_list);
+ priv->resync_wait_num = 0;
+
+ spin_lock_init(&priv->backlog_lock);
+ INIT_LIST_HEAD(&priv->backlog_list);
+
+ n->priv_callback = &dst_mirror_handle_priv;
+ n->priv = priv;
+
+ spin_lock_init(&priv->log_lock);
+
+ err = dst_mirror_log_init(n);
+ if (err)
+ goto err_out_free;
+
+ n->size -= to_sector(priv->log.nr_pages << PAGE_SHIFT);
+
+ mutex_lock(&st->tree_lock);
+ disk_size = st->disk_size;
+ if (st->disk_size) {
+ st->disk_size = min(n->size, st->disk_size);
+ } else {
+ st->disk_size = n->size;
+ first_node = 1;
+ }
+ mutex_unlock(&st->tree_lock);
+
+ err = dst_mirror_ndp_setup(n, first_node, 1);
+ if (err)
+ goto err_out_free_log;
+
+ schedule_delayed_work(&priv->resync_work, priv->resync_timeout);
+
+ dprintk("%s: n: %p, %llu:%llu, disk_size: %llu.\n",
+ __func__, n, n->start, n->size, st->disk_size);
+
+ return 0;
+
+err_out_free_log:
+ mutex_lock(&st->tree_lock);
+ st->disk_size = disk_size;
+ mutex_unlock(&st->tree_lock);
+ dst_mirror_log_exit(n);
+err_out_free:
+ kfree(priv);
+ n->priv = NULL;
+ return err;
+}
+
+static void dst_mirror_sync_destructor(struct bio *bio)
+{
+ struct bio_vec *bv;
+ int i;
+
+ bio_for_each_segment(bv, bio, i)
+ __free_page(bv->bv_page);
+ bio_free(bio, dst_mirror_bio_set);
+}
+
+static void dst_mirror_check_resync_complete(struct dst_node *n, int num_completed)
+{
+ struct dst_mirror_priv *priv = n->priv;
+
+ if (atomic_sub_return(num_completed, &priv->resync_num) == 0) {
+ dprintk("%s: completing resync request, start: %llu, size: %llu.\n",
+ __func__, priv->resync_start, priv->resync_size);
+ complete(&priv->resync_complete);
+ if (!priv->full_resync)
+ schedule_delayed_work(&priv->resync_work, 0);
+ }
+}
+
+static int dst_mirror_sync_check(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+ struct dst_mirror_node_data *pd = &priv->data;
+ unsigned int pidx, pnum, i, j;
+ struct dst_write_entry *e;
+
+ pnum = pd->resync_idx / DST_LOG_ENTRIES_PER_PAGE;
+ pidx = pd->resync_idx % DST_LOG_ENTRIES_PER_PAGE;
+
+ for (i=pnum; i<priv->log.nr_pages; ++i) {
+ for (j=pidx; j<DST_LOG_ENTRIES_PER_PAGE; ++j) {
+ e = &priv->log.entries[i][j];
+
+ if (e->error) {
+ pd->resync_idx = i*DST_LOG_ENTRIES_PER_PAGE + j;
+ return 1;
+ }
+ }
+
+ pidx = 0;
+ }
+
+ dst_mirror_mark_node_sync(n);
+ return 0;
+}
+
+static int dst_mirror_sync_endio(struct bio *bio, unsigned int size, int err)
+{
+ dprintk("%s: bio: %p, err: %d, size: %u, op: %s.\n",
+ __func__, bio, err, bio->bi_size,
+ (bio_rw(bio) != WRITE)?"read":"write");
+
+ if (bio->bi_size)
+ return 1;
+
+ if (bio_rw(bio) != WRITE) {
+ struct dst_mirror_sync_container *sc = bio->bi_private;
+
+ if (err)
+ dst_mirror_mark_notsync(sc->node);
+
+ if (!err) {
+ bio->bi_size = sc->size;
+ bio->bi_sector = sc->start;
+ }
+ bio->bi_rw = WRITE;
+ } else {
+ struct dst_node *n = bio->bi_private;
+ struct dst_mirror_priv *priv = n->priv;
+
+ if (err)
+ dst_mirror_mark_notsync(n);
+ else if (!priv->full_resync) {
+ struct dst_mirror_node_data *pd = &priv->data;
+ unsigned long flags;
+
+ spin_lock_irqsave(&priv->log_lock, flags);
+ pd->resync_idx = (pd->resync_idx + 1) % pd->num;
+ dst_mirror_sync_check(n);
+ spin_unlock_irqrestore(&priv->log_lock, flags);
+ }
+ bio_put(bio);
+ dst_mirror_check_resync_complete(n, 1);
+ }
+
+ return 0;
+}
+
+static int dst_mirror_sync_block(struct dst_node *n,
+ u64 start, u32 size)
+{
+ struct bio *bio;
+ unsigned int nr_pages = DIV_ROUND_UP(size, PAGE_SIZE), i, nr;
+ struct page *page;
+ int err = -ENOMEM;
+ unsigned long flags;
+ struct dst_mirror_sync_container *sc;
+ struct dst_mirror_priv *priv = n->priv;
+
+ dprintk("%s: [all in sectors] start: %llu, size: %u, nr_pages: %u, disk_size: %llu.\n",
+ __func__, (u64)to_sector(start), (unsigned int)to_sector(size),
+ nr_pages, n->st->disk_size);
+
+ atomic_set(&priv->resync_num, nr_pages);
+
+ while (nr_pages) {
+ nr = min_t(unsigned int, nr_pages, BIO_MAX_PAGES);
+
+ sc = kmalloc(sizeof(struct dst_mirror_sync_container), GFP_KERNEL);
+ if (!sc)
+ return -ENOMEM;
+
+ bio = bio_alloc_bioset(GFP_NOIO, nr, dst_mirror_bio_set);
+ if (!bio)
+ goto err_out_free_sc;
+
+ bio->bi_rw = READ;
+ bio->bi_private = sc;
+ bio->bi_sector = to_sector(start);
+ bio->bi_bdev = NULL;
+ bio->bi_destructor = dst_mirror_sync_destructor;
+ bio->bi_end_io = dst_mirror_sync_endio;
+
+ for (i = 0; i < nr; ++i) {
+ page = alloc_page(GFP_NOIO);
+ if (!page)
+ break;
+
+ err = bio_add_pc_page(n->st->queue, bio, page,
+ min_t(u32, PAGE_SIZE, size), 0);
+ if (err <= 0)
+ break;
+ size -= err;
+ err = 0;
+ }
+
+ if (!bio->bi_vcnt) {
+ err = -ENOMEM;
+ goto err_out_put_bio;
+ }
+
+ sc->node = n;
+ sc->bio = bio;
+ sc->start = bio->bi_sector;
+ sc->size = bio->bi_size;
+
+ dst_mirror_check_resync_complete(n, i-1);
+
+ spin_lock_irqsave(&priv->backlog_lock, flags);
+ list_add_tail(&sc->sync_entry, &priv->backlog_list);
+ spin_unlock_irqrestore(&priv->backlog_lock, flags);
+
+ nr_pages -= bio->bi_vcnt;
+ dprintk("%s: start: %llu, size: %u/%u, bio: %p, rest_pages: %u, rest_bytes: %u.\n",
+ __func__, start, bio->bi_size, nr, bio, nr_pages, size);
+
+ start += bio->bi_size;
+
+ err = n->st->queue->make_request_fn(n->st->queue, bio);
+ if (err)
+ goto err_out_del;
+ }
+
+ return 0;
+
+err_out_del:
+ spin_lock_irqsave(&priv->backlog_lock, flags);
+ list_del(&sc->sync_entry);
+ spin_unlock_irqrestore(&priv->backlog_lock, flags);
+err_out_put_bio:
+ bio_put(bio);
+err_out_free_sc:
+ kfree(sc);
+ return err;
+}
+
+static void dst_mirror_read_endio(struct dst_request *req, int err)
+{
+ if (err)
+ dst_mirror_mark_notsync(req->node);
+
+ if (err && req->state)
+ kst_wake(req->state);
+
+ if (!err || req->callback)
+ kst_bio_endio(req, err);
+}
+
+static void dst_mirror_update_write_log(struct dst_request *req, int err)
+{
+ struct dst_mirror_priv *priv = req->node->priv;
+ struct dst_mirror_log *log = &priv->log;
+ struct dst_mirror_node_data *pd = &priv->data;
+ unsigned long flags;
+ struct dst_write_entry *e;
+ unsigned int pnum, idx;
+
+ spin_lock_irqsave(&priv->log_lock, flags);
+
+ pnum = pd->write_idx / DST_LOG_ENTRIES_PER_PAGE;
+ idx = pd->write_idx % DST_LOG_ENTRIES_PER_PAGE;
+
+ e = &log->entries[pnum][idx];
+ e->error = err;
+ e->start = req->start - to_sector(req->orig_size);
+ e->size = req->orig_size;
+
+ if (++pd->write_idx == pd->num)
+ pd->write_idx = 0;
+
+ if (test_bit(DST_NODE_NOTSYNC, &req->node->flags) &&
+ pd->write_idx == pd->resync_idx)
+ priv->full_resync = 1;
+
+ spin_unlock_irqrestore(&priv->log_lock, flags);
+}
+
+static void dst_mirror_write_endio(struct dst_request *req, int err)
+{
+ if (err) {
+ dst_mirror_mark_notsync(req->node);
+ if (req->state)
+ kst_wake(req->state);
+ }
+ dst_mirror_update_write_log(req, err);
+
+ req = req->priv;
+
+ dprintk("%s: req: %p, priv: %p err: %d, bio: %p, "
+ "cnt: %d, orig_size: %llu.\n",
+ __func__, req, req->priv, err, req->bio,
+ atomic_read(&req->refcnt), req->orig_size);
+
+ if (atomic_dec_and_test(&req->refcnt)) {
+ bio_endio(req->bio, req->orig_size, 0);
+ dst_free_request(req);
+ }
+}
+
+static int dst_mirror_process_request(struct dst_request *req,
+ struct dst_node *n)
+{
+ int err;
+
+ dst_mirror_sync_requeue(n, 0);
+ err = dst_mirror_process_request_nosync(req, n);
+ if (err > 0)
+ err = 0;
+ if (err)
+ req->bio_endio(req, err);
+
+ return err;
+}
+
+static int dst_mirror_write(struct dst_request *oreq)
+{
+ struct dst_node *n, *node = oreq->node;
+ struct dst_request *req = oreq;
+ int num, err = 0, err_num = 0, orig_num;
+ struct dst_mirror_priv *priv = node->priv;
+ unsigned long flags;
+
+ /*
+ * This check is for requests which fell into resync window.
+ * Such requests are written when resync window moves forward.
+ */
+ if (oreq->bio_endio != &dst_mirror_write_endio) {
+ req = dst_clone_request(oreq, oreq->node->w->req_pool);
+ if (!req) {
+ err = -ENOMEM;
+ goto err_out_exit;
+ }
+
+ req->priv = req;
+ req->bio_endio = &dst_mirror_write_endio;
+ }
+
+ if (test_bit(DST_NODE_NOTSYNC, &node->flags) &&
+ oreq->start >= priv->resync_start &&
+ to_sector(oreq->orig_size) <= priv->resync_size &&
+ priv->full_resync) {
+ dprintk("%s: queueing request: start: %llu, size: %llu, resync window start: %llu, size: %llu.\n",
+ __func__, oreq->start, (u64)to_sector(oreq->orig_size),
+ priv->resync_start, priv->resync_size);
+ spin_lock_irqsave(&priv->resync_wait_lock, flags);
+ list_add_tail(&req->request_list_entry, &priv->resync_wait_list);
+ priv->resync_wait_num++;
+ spin_unlock_irqrestore(&priv->resync_wait_lock, flags);
+ return 0;
+ }
+
+ /*
+ * This logic is pretty simple - req->bio_endio will not
+ * call bio_endio() until all mirror devices completed
+ * processing of the request (no matter with or without error).
+ * Mirror's req->bio_endio callback will take care of that.
+ */
+ orig_num = num = atomic_read(&req->node->shared_num) + 1;
+ atomic_set(&req->refcnt, num);
+
+ dprintk("\n%s: req: %p, mirror to %d nodes.\n",
+ __func__, req, num);
+
+ err = dst_mirror_process_request(req, node);
+ if (err)
+ err_num++;
+
+ if (--num) {
+ list_for_each_entry(n, &node->shared, shared) {
+ dprintk("\n%s: req: %p, start: %llu, size: %llu, "
+ "num: %d, n: %p, state: %p.\n",
+ __func__, req, req->start,
+ req->size, num, n, n->state);
+
+ err = dst_mirror_process_request(req, n);
+ if (err)
+ err_num++;
+
+ if (--num <= 0)
+ break;
+ }
+ }
+
+ if (err_num == orig_num)
+ dprintk("%s: req: %p, num: %d, err: %d.\n",
+ __func__, req, num, err);
+
+ err = 0;
+
+err_out_exit:
+ return err;
+}
+
+static int dst_mirror_read(struct dst_request *req)
+{
+ struct dst_node *node = req->node, *n, *min_dist_node;
+ struct dst_mirror_priv *priv = node->priv;
+ u64 dist, d;
+ int err;
+
+ req->bio_endio = &dst_mirror_read_endio;
+
+ do {
+ err = -ENODEV;
+ min_dist_node = NULL;
+ dist = -1ULL;
+
+ /*
+ * Reading is never performed from the node under resync.
+ */
+
+ if (!test_bit(DST_NODE_NOTSYNC, &node->flags)) {
+ priv = node->priv;
+ if (req->start > priv->last_start)
+ dist = req->start - priv->last_start;
+ else
+ dist = priv->last_start - req->start;
+ min_dist_node = req->node;
+ }
+
+ list_for_each_entry(n, &node->shared, shared) {
+ if (test_bit(DST_NODE_NOTSYNC, &n->flags))
+ continue;
+
+ priv = n->priv;
+
+ if (req->start > priv->last_start)
+ d = req->start - priv->last_start;
+ else
+ d = priv->last_start - req->start;
+
+ if (d < dist)
+ min_dist_node = n;
+ }
+
+ if (!min_dist_node)
+ break;
+
+ priv = min_dist_node->priv;
+ priv->last_start = req->start;
+
+ req->node = min_dist_node;
+ req->state = req->node->state;
+
+ if (req->node->bdev) {
+ req->bio->bi_bdev = req->node->bdev;
+ generic_make_request(req->bio);
+ err = 0;
+ break;
+ }
+
+ err = req->state->ops->push(req);
+ if (err) {
+ dprintk("%s: req: %p, bio: %p, node: %p, err: %d.\n",
+ __func__, req, req->bio, min_dist_node, err);
+ dst_mirror_mark_notsync(req->node);
+ }
+ } while (err && min_dist_node);
+
+ if (err || !min_dist_node) {
+ dprintk("%s: req: %p, bio: %p, node: %p, err: %d.\n",
+ __func__, req, req->bio, min_dist_node, err);
+ if (!err)
+ err = -ENODEV;
+ }
+ dprintk("%s: req: %p, err: %d.\n", __func__, req, err);
+ return err;
+}
+
+/*
+ * This callback is invoked from block layer request processing function,
+ * its task is to remap block request to different nodes.
+ */
+static int dst_mirror_remap(struct dst_request *req)
+{
+ int (*remap[])(struct dst_request *) =
+ {&dst_mirror_read, &dst_mirror_write};
+
+ return remap[bio_rw(req->bio) == WRITE](req);
+}
+
+static void dst_mirror_write_queued(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+ unsigned long flags;
+ struct dst_request *req;
+ int num = priv->resync_wait_num, err;
+
+ while (!list_empty(&priv->resync_wait_list) && num != 0) {
+ req = NULL;
+ spin_lock_irqsave(&priv->resync_wait_lock, flags);
+ if (!list_empty(&priv->resync_wait_list)) {
+ req = list_entry(priv->resync_wait_list.next,
+ struct dst_request,
+ request_list_entry);
+ list_del_init(&req->request_list_entry);
+ num--;
+ }
+ spin_unlock_irqrestore(&priv->resync_wait_lock, flags);
+
+ if (!req)
+ break;
+
+ dprintk("%s: queued request n: %p, req: %p, start: %llu, size: %llu, num: %d.\n",
+ __func__, n, req, req->start, (u64)to_sector(req->size), num);
+ err = dst_mirror_process_request(req, n);
+ if (err)
+ break;
+ }
+}
+
+static int dst_mirror_resync_partial(struct dst_node *node)
+{
+ struct dst_storage *st = node->st;
+ struct dst_node *first = node->shared_head, *n, *sync;
+ struct dst_mirror_priv *p = node->priv, *sp;
+ struct dst_mirror_node_data *pd = &p->data;
+ struct dst_mirror_node_data *spd;
+ struct dst_write_entry *e, *ep;
+ unsigned long flags;
+ unsigned int pnum, idx;
+ u64 start;
+ u32 size;
+
+ if (!first)
+ first = node;
+
+ sync = NULL;
+ mutex_lock(&st->tree_lock);
+ list_for_each_entry(n, &first->shared, shared) {
+ if (!test_bit(DST_NODE_NOTSYNC, &n->flags)) {
+ sync = n;
+ dst_node_get(sync);
+ break;
+ }
+ }
+ mutex_unlock(&st->tree_lock);
+ dprintk("%s: n: %p, first: %p, sync: %p.\n", __func__, node, first, sync);
+
+ if (!sync)
+ return -ENODEV;
+
+ sp = sync->priv;
+ spd = &sp->data;
+
+ spin_lock_irqsave(&sp->log_lock, flags);
+ spin_lock(&p->log_lock);
+
+ pnum = pd->resync_idx / DST_LOG_ENTRIES_PER_PAGE;
+ idx = pd->resync_idx % DST_LOG_ENTRIES_PER_PAGE;
+
+ ep = sp->log.entries[pnum];
+ e = &ep[idx];
+
+ start = e->start;
+ size = e->size;
+
+ spin_unlock(&p->log_lock);
+ spin_unlock_irqrestore(&sp->log_lock, flags);
+
+ dprintk("%s: node write_idx: %llu, resync_idx: %llu, num: %llu, sync write_idx: %llu, num: %llu.\n",
+ __func__, pd->write_idx, pd->resync_idx, pd->num, spd->write_idx, spd->num);
+ dprintk("%s: sync request: start: %llu, size: %llu.\n",
+ __func__, start, (u64)to_sector(size));
+
+ dst_node_put(sync);
+
+ return dst_mirror_sync_block(node, to_bytes(start), size);
+}
+
+/*
+ * Resync logic - sliding window algorithm.
+ *
+ * At startup system checks age (unique cookie) of the node and if it
+ * does not match first node it resyncs all data from the first node in
+ * the mirror to others (non-sync nodes), each non-synced node has a
+ * window, which slides from the start of the node to the end.
+ * During resync all requests, which enter the window are queued, thus
+ * window has to be sufficiently small. When window is synced from the
+ * other nodes, queued requests are written and window moves forward,
+ * thus subsequent resync is started when previous window is fully completed.
+ * When window reaches end of the node, it is marked as synchronized.
+ *
+ * If age of the node matches the first one, but log contains different
+ * number of write log entries compared to the first node (first node always
+ * stands as a clean), then partial resync is scheduled.
+ * Partial resync will also be scheduled when log entry pointed by resync
+ * index of the node contains error.
+ *
+ * Mechanism of this resync type is following: system selects a sync node
+ * (checking each node's flags) and fetches a log entry pointed by resync
+ * index of the given node and resync data from other nodes to given one.
+ * Then it checks the rest of the write log and checks if there are
+ * another failed writes, so that next resync block would be fetched for
+ * them.
+ */
+static int dst_mirror_resync(struct dst_node *n, int ndp)
+{
+ struct dst_mirror_priv *priv = n->priv;
+ struct dst_mirror_priv *fp = priv;
+ u64 total, allowed, size;
+ int err;
+
+ if (n->shared_head)
+ fp = n->shared_head->priv;
+
+ if (!test_bit(DST_NODE_NOTSYNC, &n->flags))
+ return 0;
+ if (atomic_read(&priv->resync_num) != 0) {
+ dprintk("%s: n: %p, resync_num: %d.\n",
+ __func__, n, atomic_read(&priv->resync_num));
+ return -EAGAIN;
+ }
+
+ allowed = global_page_state(NR_FREE_PAGES) +
+ global_page_state(NR_FILE_PAGES);
+ allowed /= 2;
+ allowed = to_sector(allowed << PAGE_SHIFT);
+
+ size = min(priv->resync_size, n->size - priv->resync_start);
+
+ total = min(allowed, size);
+
+ printk(KERN_NOTICE "%s: node: %p, %llu:%llu %s synchronization has been started "
+ "from %llu, allowed: %llu, total: %llu.\n",
+ __func__, n, n->start, n->size,
+ (priv->data.age == fp->data.age) ? "partial" : "full",
+ priv->resync_start, allowed, total);
+
+ if (!priv->full_resync)
+ return dst_mirror_resync_partial(n);
+
+ dst_mirror_write_queued(n);
+
+ if (priv->resync_start == n->size) {
+ dst_mirror_mark_node_sync(n);
+ priv->data.age = fp->data.age;
+ dst_mirror_write_node_data(n, &priv->data);
+ return 0;
+ }
+
+ if (ndp) {
+ err = dst_mirror_ndp_setup(n, 0, 0);
+ if (err)
+ return err;
+ }
+
+ err = dst_mirror_sync_block(n, to_bytes(priv->resync_start),
+ to_bytes(total));
+ if (!err)
+ priv->resync_start += total;
+
+ return err;
+}
+
+static int dst_mirror_error(struct kst_state *st, int err)
+{
+ struct dst_request *req, *tmp;
+ unsigned int revents = st->socket->ops->poll(NULL, st->socket, NULL);
+
+ dprintk("%s: err: %d, revents: %x, notsync: %d.\n",
+ __func__, err, revents,
+ test_bit(DST_NODE_NOTSYNC, &st->node->flags));
+
+ if (err == -EEXIST)
+ return err;
+
+ if (!(revents & (POLLERR | POLLHUP)) &&
+ (err == -EPIPE || err == -ECONNRESET)) {
+ if (test_bit(DST_NODE_NOTSYNC, &st->node->flags))
+ return dst_mirror_resync(st->node, 1);
+ return 0;
+ }
+
+ if (atomic_read(&st->node->shared_num) == 0 &&
+ !st->node->shared_head) {
+ dprintk("%s: this node is the only one in the mirror, "
+ "can not mark it notsync.\n", __func__);
+ return err;
+ }
+
+ dst_mirror_mark_notsync(st->node);
+
+ mutex_lock(&st->request_lock);
+ list_for_each_entry_safe(req, tmp, &st->request_list,
+ request_list_entry) {
+ kst_del_req(req);
+ dprintk("%s: requeue [%c], start: %llu, idx: %d,"
+ " num: %d, size: %llu, offset: %u, err: %d.\n",
+ __func__, (bio_rw(req->bio) == WRITE)?'W':'R',
+ req->start, req->idx, req->num, req->size,
+ req->offset, err);
+
+ if (bio_rw(req->bio) != WRITE) {
+ req->start -= to_sector(req->orig_size - req->size);
+ req->size = req->orig_size;
+ req->flags &= ~(DST_REQ_HEADER_SENT | DST_REQ_CHEKSUM_RECV);
+ req->idx = 0;
+ if (dst_mirror_read(req))
+ dst_free_request(req);
+ } else {
+ kst_complete_req(req, err);
+ }
+ }
+ mutex_unlock(&st->request_lock);
+ return err;
+}
+
+/*
+ * This callback is invoked when node is removed from storage.
+ */
+static void dst_mirror_del_node(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+
+ priv->full_resync = 1;
+ cancel_rearming_delayed_work(&priv->resync_work);
+ flush_scheduled_work();
+
+ dprintk("%s: n: %p, backlog_empty: %d, resync_num: %d.\n",
+ __func__, n, list_empty(&priv->backlog_list),
+ atomic_read(&priv->resync_num));
+
+ /*
+ * This strange-looking loop waits until all resync read requests
+ * are completed, this happens in dst_mirror_sync_requeue().
+ */
+ while (atomic_read(&priv->resync_num)) {
+ dst_mirror_sync_requeue(n, 1);
+ if (printk_ratelimit())
+ dprintk("%s: n: %p, backlog_empty: %d, resync_num: %d.\n",
+ __func__, n, list_empty(&priv->backlog_list),
+ atomic_read(&priv->resync_num));
+ msleep(100);
+ }
+
+ wait_for_completion(&priv->resync_complete);
+ dst_mirror_sync_requeue(n, 1);
+
+ if (priv) {
+ dst_mirror_log_exit(n);
+ kfree(priv);
+ n->priv = NULL;
+ }
+
+ if (n->device.parent == &n->st->device) {
+ int i;
+
+ for (i=0; i<ARRAY_SIZE(dst_mirror_attrs); ++i)
+ device_remove_file(&n->device, &dst_mirror_attrs[i]);
+ }
+}
+
+static struct dst_alg_ops alg_mirror_ops = {
+ .remap = dst_mirror_remap,
+ .add_node = dst_mirror_add_node,
+ .del_node = dst_mirror_del_node,
+ .error = dst_mirror_error,
+ .owner = THIS_MODULE,
+};
+
+static int __devinit alg_mirror_init(void)
+{
+ int err = -ENOMEM;
+
+ dst_mirror_bio_set = bioset_create(256, 256);
+ if (!dst_mirror_bio_set)
+ return -ENOMEM;
+
+ alg_mirror = dst_alloc_alg("alg_mirror", &alg_mirror_ops);
+ if (!alg_mirror)
+ goto err_out;
+
+ return 0;
+
+err_out:
+ bioset_free(dst_mirror_bio_set);
+ return err;
+}
+
+static void __devexit alg_mirror_exit(void)
+{
+ dst_remove_alg(alg_mirror);
+ bioset_free(dst_mirror_bio_set);
+}
+
+module_init(alg_mirror_init);
+module_exit(alg_mirror_exit);
+
+MODULE_LICENSE("GPL");
+MODULE_AUTHOR("Evgeniy Polyakov <[email protected]>");
+MODULE_DESCRIPTION("Mirror distributed algorithm.");

2007-12-17 16:27:24

by Kay Sievers

[permalink] [raw]
Subject: Re: [1/4] DST: Distributed storage documentation.

On Dec 17, 2007 4:03 PM, Evgeniy Polyakov <[email protected]> wrote:

> +++ b/Documentation/dst/sysfs.txt
> @@ -0,0 +1,33 @@
> +This file describes sysfs files created for each storage.
> +
> +1. Per-storage files.
> +Each storage has its own dir /sysfs/devices/$storage_name,

> +2. Per-node files.
> +Node's files are located in /sysfs/devices/$storage_name/n-$start-$cookie

As already pointed out last time, you can't reference /sys/devices/ directly,
please use the path from the bus/class directory which points there.

Thanks,
Kay

2007-12-18 01:00:31

by David Chinner

[permalink] [raw]
Subject: Re: [0/4] DST: Distributed storage.

On Mon, Dec 17, 2007 at 06:03:38PM +0300, Evgeniy Polyakov wrote:
> DST passed all FS tests in LTP with XFS (modulo MAX_LOCK_DEPTH too low bug:
> [ 8398.605691] BUG: MAX_LOCK_DEPTH too low!
> [ 8398.609641] turning off the locking correctness validator.

Evgeniy, can you please start reporting these XFS problems you are
coming across to the XFS list ([email protected])? They may be
real issues that we need to address and we should not be hearing
about them for the first time in the release notes for a block
device project....

Cheers,

Dave.
--
Dave Chinner
Principal Engineer
SGI Australian Software Group

2007-12-18 11:09:22

by Evgeniy Polyakov

[permalink] [raw]
Subject: Re: [0/4] DST: Distributed storage.

Hi David.

On Tue, Dec 18, 2007 at 12:00:04PM +1100, David Chinner ([email protected]) wrote:
> On Mon, Dec 17, 2007 at 06:03:38PM +0300, Evgeniy Polyakov wrote:
> > DST passed all FS tests in LTP with XFS (modulo MAX_LOCK_DEPTH too low bug:
> > [ 8398.605691] BUG: MAX_LOCK_DEPTH too low!
> > [ 8398.609641] turning off the locking correctness validator.
>
> Evgeniy, can you please start reporting these XFS problems you are
> coming across to the XFS list ([email protected])? They may be
> real issues that we need to address and we should not be hearing
> about them for the first time in the release notes for a block
> device project....

It is not XFS as is, but lock validator warning. I just found it
working with XFS - it can be anything else: VFS, block layer, DST itself
(there is number of locks too), so I did not fill the bug against
filesystem, but pointed that some problem, probalby harmless,
exists in tested environment.

--
Evgeniy Polyakov