Return-Path: Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1761857AbXIJWPR (ORCPT ); Mon, 10 Sep 2007 18:15:17 -0400 Received: (majordomo@vger.kernel.org) by vger.kernel.org id S932221AbXIJWO7 (ORCPT ); Mon, 10 Sep 2007 18:14:59 -0400 Received: from e2.ny.us.ibm.com ([32.97.182.142]:56555 "EHLO e2.ny.us.ibm.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1756717AbXIJWOs (ORCPT ); Mon, 10 Sep 2007 18:14:48 -0400 Date: Mon, 10 Sep 2007 15:14:45 -0700 From: "Paul E. McKenney" To: Evgeniy Polyakov Cc: netdev@vger.kernel.org, linux-kernel@vger.kernel.org, linux-fsdevel@vger.kernel.org Subject: Re: Distributed storage. Security attributes and ducumentation update. Message-ID: <20070910221445.GL11801@linux.vnet.ibm.com> Reply-To: paulmck@linux.vnet.ibm.com References: <20070831160611.GA21660@2ka.mipt.ru> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20070831160611.GA21660@2ka.mipt.ru> User-Agent: Mutt/1.5.13 (2006-08-11) Sender: linux-kernel-owner@vger.kernel.org X-Mailing-List: linux-kernel@vger.kernel.org Content-Length: 127610 Lines: 4699 On Fri, Aug 31, 2007 at 08:06:13PM +0400, Evgeniy Polyakov wrote: > On Tue, Jul 31, 2007 at 09:13:47PM +0400, Evgeniy Polyakov (johnpol@2ka.mipt.ru) wrote: > Hi. > > I'm pleased to announce third release of the distributed storage > subsystem, which allows to form a storage on top of remote and local > nodes, which in turn can be exported to another storage as a node to > form tree-like storages. > > This release includes following changes: > * security attributes (permission mask assigned to addresses, allowed to > connect to given local export node) > * big documentation update (userspace documentation on the site also > includes various usage case examples and descirption of the > configuration utilitiy, protocols and userspace target) > * mirror algorithm has been moved from per-page to per-sector dirty > bitmask > > Further TODO list includes: > * implement optional saving of mirroring/linear information on the remote > nodes (simple) > * implement netlink based setup (simple) > * new redundancy algorithm (complex) > > Homepage: > http://tservice.net.ru/~s0mbre/old/?section=projects&item=dst A couple questions below, but otherwise looks good from an RCU viewpoint. Thanx, Paul > Signed-off-by: Evgeniy Polyakov > > diff --git a/Documentation/dst/algorithms.txt b/Documentation/dst/algorithms.txt > new file mode 100644 > index 0000000..bfc6984 > --- /dev/null > +++ b/Documentation/dst/algorithms.txt > @@ -0,0 +1,115 @@ > +Each storage by itself is just a set of contiguous logical blocks, with > +allowed number of operations. Nodes, each of which has own start and size, > +are placed into storage by appropriate algorithm, which remaps > +logical sector number into real node's sector. One can create > +own algorithms, since DST has pluggable interface for that. > +Currently mirrored and linear algorithms are supported. > + > +Let's briefly describe how they work. > + > +Linear algorithm. > +Simple approach of concatenating storages into single device with > +increased size is used in this algorithm. Essentially new device > +has size equal to sum of sizes of underlying nodes and nodes are > +placed one after another. > + > + /----- Node 1 ---\ /------ Node 3 ----\ > +start end start end > + |==================|========================|==================| > + | start end | > + | \------- Node 2 ---------/ | > + | | > +start end > + \-------------------------- DST storage ----------------------/ > + > + /\ > + || > + || > + > + IO operations > + > + Figure 1. > + 3 nodes combined into single storage using linear algorithm. > + > +Mirror algorithm. > +In this algorithms nodes are placed under each other, so when > +operation comes to the first one, it can be mirrored to all > +underlying nodes. In case of reading, actual data is obtained from > +the nearest node - algoritm keeps track of previous operation > +and knows where it was stopped, so that subsequent seek to the > +start of the new request will take the shortest time. > +Writing is always mirrored to all underlying nodes. > + > + IO operations > + || > + || > + \/ > + > +|---------------- DST storate -------------------| > +| prev position | > +|-------|------------ Node 1 --------------------| > +| prev pos | > +|-------------------- Node 2 -----|--------------| > +|prev pos | > +|---|---------------- Node 3 --------------------| > + > + Figure 2. > + 3 nodes combined into single storage using mirror algorithm. > + > +Each algorithm must implement number of callbacks, > +which must be registered during initialization time. > + > +struct dst_alg_ops > +{ > + int (*add_node)(struct dst_node *n); > + void (*del_node)(struct dst_node *n); > + int (*remap)(struct dst_request *req); > + int (*error)(struct kst_state *state, int err); > + struct module *owner; > +}; > + > +@add_node. > +This callback is invoked when new node is being added into the storage, > +but before node is actually added into the storage, so that it could > +be accessed from it. When it is called, all appropriate initialization > +of the underlying device is already completed (system has been connected > +to remote node or got a reference to the local block device). At this > +stage algorithm can add node into private map. > +It must return zero on success or negative value otherwise. > + > +@del_node. > +This callback is invoked when node is being deleted from the storage, > +i.e. when its reference counter hits zero. It is called before > +any cleaning is performed. > +It must return zero on success or negative value otherwise. > + > +@remap. > +This callback is invoked each time new bio hits the storage. > +Request structure contains BIO itself, pointer to the node, which originally > +stores the whole region under given IO request, and various parameters > +used by storage core to process this block request. > +It must return zero on success or negative value otherwise. It is upto > +this method to call all cleaning if remapping failed, for example it must > +call kst_bio_endio() for given callback in case of error, which in turn > +will call bio_endio(). Note, that dst_request structure provided in this > +callback is allocated on stack, so if there is a need to use it outside > +of the given function, it must be cloned (it will happen automatically > +in state's push callback, but that copy will not be shared by any other > +user). > + > +@error. > +This callback is invoked for each error, which happend when processed > +requests for remote nodes or when talking to remote size > +of the local export node (state contains data related to data > +transfers over the network). > +If this function has fixed given error, it must return 0 or negative > +error value otherwise. > + > +@owner. > +This is module reference counter updated automatically by DST core. > + > +Algorithm must provide its name and above structure to the > +dst_alloc_alg() function, which will return a reference to the newly > +created algorithm. > +To remove it, one needs to call dst_remove_alg() with given algorithm > +pointer. > diff --git a/Documentation/dst/dst.txt b/Documentation/dst/dst.txt > new file mode 100644 > index 0000000..3b326aa > --- /dev/null > +++ b/Documentation/dst/dst.txt > @@ -0,0 +1,66 @@ > +Distributed storage. Design and implementation. > +http://tservice.net.ru/~s0mbre/old/?section=projects&item=dst > + > + Evgeniy Polyakov > + > +This document is intended to briefly describe design and > +implementation details of the distributed storage project, > +aimed to create ability to group physically and/or logically > +distributed storages into single device. > + > +Main operational unit in the storage is node. Node can represent > +either remote storage, connected to local machine, or local > +device, or storage exported to the outside of the system. > +Here goes small explaination of basic therms. > + > +Local node. > +This node is just a logical link between block device (with given > +major and minor numbers) and structure in the DST hierarchy, > +which represents number of sectors on the area, corresponding to given > +block device. it can be a disk, a device mapper node or stacked > +block device on top of another underlying DST nodes. > + > +Local export node. > +Essentially the same as local node, but it allows to access > +to its data via network. Remote clients can connect to given local > +export node and read or write blocks according to its size. > +Blocks are then forwarded to underlying local node and processed > +there accordingly to the nature of the local node. > + > +Remote node. > +This type of nodes contain remotely accessible devices. One can think > +about remote nodes as remote disks, which can be connected to > +local system and combined into single storage. Remote nodes > +are presented as number of sectors accessed over the network > +by the local machine, where distributed storage is being formed. > + > + > +Each node or set of them can be formed into single array, which > +in turn becomes a local node, which can be exported further by stacking > +a local export node on top of it. > + > +Each storage by itself is just a set of contiguous logical blocks, with > +allowed number of operations. Nodes, each of which has own start and size, > +are placed into storage by appropriate algorithm, which remaps > +logical sector number into real node's sector. One can create > +own algorithms, since DST has pluggable interface for that. > +Currently mirrored and linear algorithms are supported. > +One can find more details in Documentation/dst/algorithms.txt file. > + > +Main goal of the distributed storage is to combine remote nodes into > +single device, so each block IO request is being sent over the network > +(contrary requests for local nodes are handled by the gneric block > +layer features). Each network connection has number of variables which > +describe it (socket, list of requests, error handling and so on), > +which form kst_state structure. This network state is added into per-socket > +polling state machine, and can be processed by dedicated thread when > +becomes ready. This system forms asynchronous IO for given block > +requests. If block request can be processed without blocking, then > +no new structures are allocated and async part of the state is not used. > + > +When connection to the remote peer breaks, DST core tries to reconnect > +to failed node and no requests are marked as errorneous, instead > +they live in the queue until reconnectin is established. > + > +Userspace code, setup documentation and examples can be found on project's > +homepage above. > diff --git a/drivers/block/Kconfig b/drivers/block/Kconfig > index b4c8319..ca6592d 100644 > --- a/drivers/block/Kconfig > +++ b/drivers/block/Kconfig > @@ -451,6 +451,8 @@ config ATA_OVER_ETH > This driver provides Support for ATA over Ethernet block > devices like the Coraid EtherDrive (R) Storage Blade. > > +source "drivers/block/dst/Kconfig" > + > source "drivers/s390/block/Kconfig" > > endmenu > diff --git a/drivers/block/Makefile b/drivers/block/Makefile > index dd88e33..fcf042d 100644 > --- a/drivers/block/Makefile > +++ b/drivers/block/Makefile > @@ -29,3 +29,4 @@ obj-$(CONFIG_VIODASD) += viodasd.o > obj-$(CONFIG_BLK_DEV_SX8) += sx8.o > obj-$(CONFIG_BLK_DEV_UB) += ub.o > > +obj-$(CONFIG_DST) += dst/ > diff --git a/drivers/block/dst/Kconfig b/drivers/block/dst/Kconfig > new file mode 100644 > index 0000000..9c5eba2 > --- /dev/null > +++ b/drivers/block/dst/Kconfig > @@ -0,0 +1,19 @@ > +config DST > + tristate "Distributed storage" > + depends on NET > + ---help--- > + This driver allows to create a distributed storage. > + > +config DST_ALG_LINEAR > + tristate "Linear distribution algorithm" > + depends on DST > + ---help--- > + This module allows to create linear mapping of the nodes > + in the distributed storage. > + > +config DST_ALG_MIRROR > + tristate "Mirror distribution algorithm" > + depends on DST > + ---help--- > + This module allows to create a mirror of the noes in the > + distributed storage. > diff --git a/drivers/block/dst/Makefile b/drivers/block/dst/Makefile > new file mode 100644 > index 0000000..1400e94 > --- /dev/null > +++ b/drivers/block/dst/Makefile > @@ -0,0 +1,6 @@ > +obj-$(CONFIG_DST) += dst.o > + > +dst-y := dcore.o kst.o > + > +obj-$(CONFIG_DST_ALG_LINEAR) += alg_linear.o > +obj-$(CONFIG_DST_ALG_MIRROR) += alg_mirror.o > diff --git a/drivers/block/dst/alg_linear.c b/drivers/block/dst/alg_linear.c > new file mode 100644 > index 0000000..584f99e > --- /dev/null > +++ b/drivers/block/dst/alg_linear.c > @@ -0,0 +1,99 @@ > +/* > + * 2007+ Copyright (c) Evgeniy Polyakov > + * All rights reserved. > + * > + * This program is free software; you can redistribute it and/or modify > + * it under the terms of the GNU General Public License as published by > + * the Free Software Foundation; either version 2 of the License, or > + * (at your option) any later version. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU General Public License for more details. > + */ > + > +#include > +#include > +#include > +#include > + > +static struct dst_alg *alg_linear; > + > +/* > + * This callback is invoked when node is removed from storage. > + */ > +static void dst_linear_del_node(struct dst_node *n) > +{ > +} > + > +/* > + * This callback is invoked when node is added to storage. > + */ > +static int dst_linear_add_node(struct dst_node *n) > +{ > + struct dst_storage *st = n->st; > + > + n->start = st->disk_size; > + st->disk_size += n->size; > + > + return 0; > +} > + > +static int dst_linear_remap(struct dst_request *req) > +{ > + int err; > + > + if (req->node->bdev) { > + generic_make_request(req->bio); > + return 0; > + } > + > + err = kst_check_permissions(req->state, req->bio); > + if (err) > + return err; > + > + return req->state->ops->push(req); > +} > + > +/* > + * Failover callback - it is invoked each time error happens during > + * request processing. > + */ > +static int dst_linear_error(struct kst_state *st, int err) > +{ > + if (err) > + set_bit(DST_NODE_FROZEN, &st->node->flags); > + else > + clear_bit(DST_NODE_FROZEN, &st->node->flags); > + return 0; > +} > + > +static struct dst_alg_ops alg_linear_ops = { > + .remap = dst_linear_remap, > + .add_node = dst_linear_add_node, > + .del_node = dst_linear_del_node, > + .error = dst_linear_error, > + .owner = THIS_MODULE, > +}; > + > +static int __devinit alg_linear_init(void) > +{ > + alg_linear = dst_alloc_alg("alg_linear", &alg_linear_ops); > + if (!alg_linear) > + return -ENOMEM; > + > + return 0; > +} > + > +static void __devexit alg_linear_exit(void) > +{ > + dst_remove_alg(alg_linear); > +} > + > +module_init(alg_linear_init); > +module_exit(alg_linear_exit); > + > +MODULE_LICENSE("GPL"); > +MODULE_AUTHOR("Evgeniy Polyakov "); > +MODULE_DESCRIPTION("Linear distributed algorithm."); > diff --git a/drivers/block/dst/alg_mirror.c b/drivers/block/dst/alg_mirror.c > new file mode 100644 > index 0000000..be42350 > --- /dev/null > +++ b/drivers/block/dst/alg_mirror.c > @@ -0,0 +1,765 @@ > +/* > + * 2007+ Copyright (c) Evgeniy Polyakov > + * All rights reserved. > + * > + * This program is free software; you can redistribute it and/or modify > + * it under the terms of the GNU General Public License as published by > + * the Free Software Foundation; either version 2 of the License, or > + * (at your option) any later version. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU General Public License for more details. > + */ > + > +#include > +#include > +#include > +#include > +#include > + > +#define DST_MIRROR_MAX_CHUNKS 4096 > + > +struct dst_mirror_priv > +{ > + unsigned int chunk_num; > + > + u64 last_start; > + > + spinlock_t backlog_lock; > + struct list_head backlog_list; > + > + unsigned long *chunk; > +}; > + > +static struct dst_alg *alg_mirror; > +static struct bio_set *dst_mirror_bio_set; > + > +static ssize_t dst_mirror_chunk_mask_show(struct device *dev, > + struct device_attribute *attr, char *buf) > +{ > + struct dst_node *n = container_of(dev, struct dst_node, device); > + struct dst_mirror_priv *priv = n->priv; > + unsigned int i; > + int rest = PAGE_SIZE; > + > + for (i = 0; i < priv->chunk_num/BITS_PER_LONG; ++i) { > + int bit, j; > + > + for (j = 0; j < BITS_PER_LONG; ++j) { > + bit = (priv->chunk[i] >> j) & 1; > + sprintf(buf, "%c", (bit)?'+':'-'); > + buf++; > + } > + > + rest -= BITS_PER_LONG; > + > + if (rest < BITS_PER_LONG) > + break; > + } > + > + return PAGE_SIZE - rest; > +} > + > +static DEVICE_ATTR(chunks, 0444, dst_mirror_chunk_mask_show, NULL); > + > +/* > + * This callback is invoked when node is removed from storage. > + */ > +static void dst_mirror_del_node(struct dst_node *n) > +{ > + struct dst_mirror_priv *priv = n->priv; > + > + vfree(priv->chunk); > + kfree(priv); > + n->priv = NULL; > + > + if (n->device.parent == &n->st->device) > + device_remove_file(&n->device, &dev_attr_chunks); > +} > + > +static void dst_mirror_handle_priv(struct dst_node *n) > +{ > + if (n->priv) { > + int err; > + err = device_create_file(&n->device, &dev_attr_chunks); > + } > +} > + > +/* > + * This callback is invoked when node is added to storage. > + */ > +static int dst_mirror_add_node(struct dst_node *n) > +{ > + struct dst_storage *st = n->st; > + struct dst_mirror_priv *priv; > + > + if (st->disk_size) > + st->disk_size = min(n->size, st->disk_size); > + else > + st->disk_size = n->size; > + > + priv = kzalloc(sizeof(struct dst_mirror_priv), GFP_KERNEL); > + if (!priv) > + return -ENOMEM; > + > + priv->chunk_num = st->disk_size; > + > + priv->chunk = vmalloc(priv->chunk_num/BITS_PER_LONG * sizeof(long)); > + if (!priv->chunk) > + goto err_out_free; > + > + memset(priv->chunk, 0, priv->chunk_num/BITS_PER_LONG * sizeof(long)); > + > + spin_lock_init(&priv->backlog_lock); > + INIT_LIST_HEAD(&priv->backlog_list); > + > + dprintk("%s: %llu:%llu, chunk_num: %u, disk_size: %llu.\n", > + __func__, n->start, n->size, > + priv->chunk_num, st->disk_size); > + > + n->priv_callback = &dst_mirror_handle_priv; > + n->priv = priv; > + > + return 0; > + > +err_out_free: > + kfree(priv); > + return -ENOMEM; > +} > + > +static void dst_mirror_sync_destructor(struct bio *bio) > +{ > + struct bio_vec *bv; > + int i; > + > + bio_for_each_segment(bv, bio, i) > + __free_page(bv->bv_page); > + bio_free(bio, dst_mirror_bio_set); > +} > + > +static void dst_mirror_sync_requeue(struct dst_node *n) > +{ > + struct dst_mirror_priv *p = n->priv; > + struct dst_request *req; > + unsigned int num, idx, i; > + u64 start; > + unsigned long flags; > + int err; > + > + while (!list_empty(&p->backlog_list)) { > + req = NULL; > + spin_lock_irqsave(&p->backlog_lock, flags); > + if (!list_empty(&p->backlog_list)) { > + req = list_entry(p->backlog_list.next, > + struct dst_request, > + request_list_entry); > + list_del(&req->request_list_entry); > + } > + spin_unlock_irqrestore(&p->backlog_lock, flags); > + > + if (!req) > + break; > + > + start = req->start - to_sector(req->orig_size - req->size); > + > + idx = start; > + num = to_sector(req->orig_size); > + > + for (i=0; i + if (test_bit(idx+i, p->chunk)) > + break; > + > + dprintk("%s: idx: %u, num: %u, i: %u, req: %p, " > + "start: %llu, size: %llu.\n", > + __func__, idx, num, i, req, > + req->start, req->orig_size); > + > + err = -1; > + if (i != num) { > + err = kst_enqueue_req(n->state, req); > + if (err) { > + printk("%s: congestion [%c]: req: %p, " > + "start: %llu, size: %llu.\n", > + __func__, > + (bio_rw(req->bio) == WRITE)?'W':'R', > + req, req->start, req->size); > + kst_del_req(req); > + } > + } > + if (err) { > + req->bio_endio(req, err); > + dst_free_request(req); > + } > + } > + > + kst_wake(n->state); > +} > + > +static void dst_mirror_mark_sync(struct dst_node *n) > +{ > + if (test_bit(DST_NODE_NOTSYNC, &n->flags)) { > + clear_bit(DST_NODE_NOTSYNC, &n->flags); > + printk("%s: node: %p, %llu:%llu synchronization " > + "has been completed.\n", > + __func__, n, n->start, n->size); > + } > +} > + > +static void dst_mirror_mark_notsync(struct dst_node *n) > +{ > + if (!test_bit(DST_NODE_NOTSYNC, &n->flags)) { > + set_bit(DST_NODE_NOTSYNC, &n->flags); > + printk("%s: not synced node n: %p.\n", __func__, n); > + } > +} > + > +/* > + * Without errors it is always called under node's request lock, > + * so it is safe to requeue them. > + */ > +static void dst_mirror_bio_error(struct dst_request *req, int err) > +{ > + int i; > + struct dst_mirror_priv *priv = req->node->priv; > + unsigned int num, idx; > + void (*process_bit[])(int nr, volatile void *addr) = > + {&__clear_bit, &__set_bit}; > + u64 start = req->start - to_sector(req->orig_size - req->size); > + > + if (err) > + dst_mirror_mark_notsync(req->node); > + else > + dst_mirror_sync_requeue(req->node); > + > + priv->last_start = req->start; > + > + idx = start; > + num = to_sector(req->orig_size); > + > + dprintk("%s: req_priv: %p, chunk %p, %llu:%llu start: %llu, size: %llu, " > + "chunk_num: %u, idx: %d, num: %d, err: %d.\n", > + __func__, req->priv, priv->chunk, req->node->start, > + req->node->size, start, req->orig_size, priv->chunk_num, > + idx, num, err); > + > + if (unlikely(idx >= priv->chunk_num || idx + num > priv->chunk_num)) { > + printk("%s: %llu:%llu req: %p, start: %llu, orig_size: %llu, " > + "req_start: %llu, req_size: %llu, " > + "chunk_num: %u, idx: %d, num: %d, err: %d.\n", > + __func__, req->node->start, req->node->size, req, > + start, req->orig_size, > + req->start, req->size, > + priv->chunk_num, idx, num, err); > + return; > + } > + > + for (i=0; i + process_bit[!!err](idx+i, priv->chunk); > +} > + > +static void dst_mirror_sync_req_endio(struct dst_request *req, int err) > +{ > + unsigned long notsync = 0; > + struct dst_mirror_priv *priv = req->node->priv; > + int i; > + > + dst_mirror_bio_error(req, err); > + > + printk("%s: freeing bio: %p, bi_size: %u, " > + "orig_size: %llu, req: %p, node: %p.\n", > + __func__, req->bio, req->bio->bi_size, req->orig_size, req, > + req->node); > + > + bio_put(req->bio); > + > + for (i = 0; i < priv->chunk_num/BITS_PER_LONG; ++i) { > + notsync = priv->chunk[i]; > + > + if (notsync) > + break; > + } > + > + if (!notsync) > + dst_mirror_mark_sync(req->node); > +} > + > +static int dst_mirror_sync_endio(struct bio *bio, unsigned int size, int err) > +{ > + struct dst_request *req = bio->bi_private; > + struct dst_node *n = req->node; > + struct dst_mirror_priv *priv = n->priv; > + unsigned long flags; > + > + printk("%s: bio: %p, err: %d, size: %u, req: %p.\n", > + __func__, bio, err, bio->bi_size, req); > + > + if (bio->bi_size) > + return 1; > + > + bio->bi_rw = WRITE; > + bio->bi_size = req->orig_size; > + bio->bi_sector = req->start; > + > + if (!err) { > + spin_lock_irqsave(&priv->backlog_lock, flags); > + list_add_tail(&req->request_list_entry, &priv->backlog_list); > + spin_unlock_irqrestore(&priv->backlog_lock, flags); > + kst_wake(req->state); > + } else { > + req->bio_endio(req, err); > + dst_free_request(req); > + } > + return 0; > +} > + > +static int dst_mirror_sync_block(struct dst_node *n, > + int bit_start, int bit_num) > +{ > + u64 start = to_bytes(bit_start); > + struct bio *bio; > + unsigned int nr_pages = to_bytes(bit_num)/PAGE_SIZE, i; > + struct page *page; > + int err = -ENOMEM; > + struct dst_request *req; > + > + printk("%s: bit_start: %d, bit_num: %d, start: %llu, nr_pages: %u, " > + "disk_size: %llu.\n", > + __func__, bit_start, bit_num, start, nr_pages, > + n->st->disk_size); > + > + while (nr_pages) { > + req = dst_clone_request(NULL, n->w->req_pool); > + if (!req) > + return -ENOMEM; > + > + bio = bio_alloc_bioset(GFP_NOIO, nr_pages, dst_mirror_bio_set); > + if (!bio) > + goto err_out_free_req; > + > + bio->bi_rw = READ; > + bio->bi_private = req; > + bio->bi_sector = to_sector(start); > + bio->bi_bdev = NULL; > + bio->bi_destructor = dst_mirror_sync_destructor; > + bio->bi_end_io = dst_mirror_sync_endio; > + > + for (i = 0; i < nr_pages; ++i) { > + err = -ENOMEM; > + > + page = alloc_page(GFP_NOIO); > + if (!page) > + break; > + > + err = bio_add_pc_page(n->st->queue, bio, > + page, PAGE_SIZE, 0); > + if (err <= 0) > + break; > + err = 0; > + } > + > + if (err && !bio->bi_vcnt) > + goto err_out_put_bio; > + > + req->node = n; > + req->state = n->state; > + req->start = bio->bi_sector; > + req->size = req->orig_size = bio->bi_size; > + req->bio = bio; > + req->idx = bio->bi_idx; > + req->num = bio->bi_vcnt; > + req->bio_endio = &dst_mirror_sync_req_endio; > + req->callback = &kst_data_callback; > + > + dprintk("%s: start: %llu, size(pages): %u, bio: %p, " > + "size: %u, cnt: %d, req: %p, size: %llu.\n", > + __func__, bio->bi_sector, nr_pages, bio, > + bio->bi_size, bio->bi_vcnt, req, req->size); > + > + err = n->st->queue->make_request_fn(n->st->queue, bio); > + if (err) > + goto err_out_put_bio; > + > + nr_pages -= bio->bi_vcnt; > + start += bio->bi_size; > + } > + > + return 0; > + > +err_out_put_bio: > + bio_put(bio); > +err_out_free_req: > + dst_free_request(req); > + return err; > +} > + > +/* > + * Resync logic. > + * > + * System allocates and queues requests for number of regions. > + * Each request initially is reading from the one of the nodes. > + * When it is completed, system checks if given region was already > + * written to, and in such case just drops read request, otherwise > + * it writes it to the node being updated. Any write clears not-uptodate > + * bit, which is used as a flag that region must be synchronized or not. > + * Reading is never performed from the node under resync. > + */ > +static int dst_mirror_resync(struct dst_node *n) > +{ > + int err = 0, sync = 0; > + struct dst_mirror_priv *priv = n->priv; > + unsigned int i; > + > + printk("%s: node: %p, %llu:%llu synchronization has been started.\n", > + __func__, n, n->start, n->size); > + > + for (i = 0; i < priv->chunk_num/BITS_PER_LONG; ++i) { > + int bit, num, start; > + unsigned long word = priv->chunk[i]; > + > + if (!word) > + continue; > + > + num = 0; > + start = -1; > + while (word && num < BITS_PER_LONG) { > + bit = __ffs(word); > + if (start == -1) > + start = bit; > + num++; > + word >>= (bit+1); > + } > + > + if (start != -1) { > + err = dst_mirror_sync_block(n, start + i*BITS_PER_LONG, > + num); > + if (err) > + break; > + sync++; > + } > + } > + > + if (!sync && !err) > + dst_mirror_mark_sync(n); > + > + return err; > +} > + > +static void dst_mirror_destructor(struct bio *bio) > +{ > + dprintk("%s: bio: %p.\n", __func__, bio); > + bio_free(bio, dst_mirror_bio_set); > +} > + > +static int dst_mirror_end_io(struct bio *bio, unsigned int size, int err) > +{ > + struct dst_request *req = bio->bi_private; > + > + if (bio->bi_size) > + return 0; > + > + dprintk("%s: req: %p, bio: %p, req->bio: %p, err: %d.\n", > + __func__, req, bio, req->bio, err); > + req->bio_endio(req, err); > + bio_put(bio); > + return 0; > +} > + > +static void dst_mirror_read_endio(struct dst_request *req, int err) > +{ > + dst_mirror_bio_error(req, err); > + > + if (!err) > + kst_bio_endio(req, 0); > +} > + > +static void dst_mirror_write_endio(struct dst_request *req, int err) > +{ > + dst_mirror_bio_error(req, err); > + > + req = req->priv; > + > + dprintk("%s: req: %p, priv: %p err: %d, bio: %p, " > + "cnt: %d, orig_size: %llu.\n", > + __func__, req, req->priv, err, req->bio, > + atomic_read(&req->refcnt), req->orig_size); > + > + if (atomic_dec_and_test(&req->refcnt)) { > + dprintk("%s: freeing bio %p.\n", __func__, req->bio); > + bio_endio(req->bio, req->orig_size, 0); > + dst_free_request(req); > + } > +} > + > +static int dst_mirror_process_request(struct dst_request *req, > + struct dst_node *n) > +{ > + int err = 0; > + > + /* > + * Block layer requires to clone a bio. > + */ > + if (n->bdev) { > + struct bio *clone = bio_alloc_bioset(GFP_NOIO, > + req->bio->bi_max_vecs, dst_mirror_bio_set); > + > + __bio_clone(clone, req->bio); > + > + clone->bi_bdev = n->bdev; > + clone->bi_destructor = dst_mirror_destructor; > + clone->bi_private = req; > + clone->bi_end_io = &dst_mirror_end_io; > + > + dprintk("%s: clone: %p, bio: %p, req: %p.\n", > + __func__, clone, req->bio, req); > + > + generic_make_request(clone); > + } else { > + struct dst_request nr; > + /* > + * Network state processing engine will clone request > + * by itself if needed. We can not use the same structure > + * here, since number of its fields will be modified. > + */ > + memcpy(&nr, req, sizeof(struct dst_request)); > + > + nr.node = n; > + nr.state = n->state; > + nr.priv = req; > + > + err = kst_check_permissions(n->state, req->bio); > + if (!err) > + err = req->state->ops->push(&nr); > + } > + > + dprintk("%s: req: %p, n: %p, bdev: %p, err: %d.\n", > + __func__, req, n, n->bdev, err); > + return err; > +} > + > +static int dst_mirror_write(struct dst_request *oreq) > +{ > + struct dst_node *n, *node = req->node; > + int num, err = 0, err_num = 0, orig_num; > + > + req = dst_clone_request(oreq, oreq->node->w->req_pool); > + if (!req) { > + kst_bio_endio(oreq, -ENOMEM); > + return -ENOMEM; > + } > + > + req->priv = req; > + > + /* > + * This logic is pretty simple - req->bio_endio will not > + * call bio_endio() until all mirror devices completed > + * processing of the request (no matter with or without error). > + * Mirror's req->bio_endio callback will take care of that. > + */ > + orig_num = num = atomic_read(&req->node->shared_num) + 1; > + atomic_set(&req->refcnt, num); > + > + req->bio_endio = &dst_mirror_write_endio; > + > + dprintk("\n%s: req: %p, mirror to %d nodes.\n", > + __func__, req, num); > + > + err = dst_mirror_process_request(req, node); > + if (err) > + err_num++; > + > + if (--num) { > + list_for_each_entry_rcu(n, &node->shared, shared) { This function is called under rcu_read_lock() or similar, right? (Can't tell from this patch.) It is also OK to call it from under the update-side mutex, of course. > + dprintk("\n%s: req: %p, start: %llu, size: %llu, " > + "num: %d, n: %p.\n", > + __func__, req, req->start, > + req->size, num, n); > + > + err = dst_mirror_process_request(req, n); > + if (err) > + err_num++; > + > + if (--num <= 0) > + break; > + } > + } > + > + if (err_num == orig_num) { > + dprintk("%s: req: %p, num: %d, err: %d.\n", > + __func__, req, num, err); > + return -ENODEV; > + } > + > + return 0; > +} > + > +static int dst_mirror_read(struct dst_request *req) > +{ > + struct dst_node *node = req->node, *n, *min_dist_node; > + struct dst_mirror_priv *priv = node->priv; > + u64 dist, d; > + int err; > + > + req->bio_endio = &dst_mirror_read_endio; > + > + do { > + err = -ENODEV; > + min_dist_node = NULL; > + dist = -1ULL; > + > + /* > + * Reading is never performed from the node under resync. > + * If this will cause any troubles (like all nodes must be > + * resynced between each other), this check can be removed > + * and per-chunk dirty bit can be tested instead. > + */ > + > + if (!test_bit(DST_NODE_NOTSYNC, &node->flags)) { > + priv = node->priv; > + if (req->start > priv->last_start) > + dist = req->start - priv->last_start; > + else > + dist = priv->last_start - req->start; > + min_dist_node = req->node; > + } > + > + list_for_each_entry_rcu(n, &node->shared, shared) { I see one call to this function that appears to be under the update-side mutex, but I cannot tell if the other calls are safe. (Safe as in either under the update-side mutex or under rcu_read_lock() and friends.) > + if (test_bit(DST_NODE_NOTSYNC, &n->flags)) > + continue; > + > + priv = n->priv; > + > + if (req->start > priv->last_start) > + d = req->start - priv->last_start; > + else > + d = priv->last_start - req->start; > + > + if (d < dist) > + min_dist_node = n; > + } > + > + if (!min_dist_node) > + break; > + > + req->node = min_dist_node; > + req->state = req->node->state; > + > + if (req->node->bdev) { > + req->bio->bi_bdev = req->node->bdev; > + generic_make_request(req->bio); > + err = 0; > + break; > + } > + > + err = req->state->ops->push(req); > + if (err) { > + printk("%s: 1 req: %p, bio: %p, node: %p, err: %d.\n", > + __func__, req, req->bio, min_dist_node, err); > + dst_mirror_mark_notsync(req->node); > + } > + } while (err && min_dist_node); > + > + if (err) { > + printk("%s: req: %p, bio: %p, node: %p, err: %d.\n", > + __func__, req, req->bio, min_dist_node, err); > + kst_bio_endio(req, err); > + } > + return err; > +} > + > +/* > + * This callback is invoked from block layer request processing function, > + * its task is to remap block request to different nodes. > + */ > +static int dst_mirror_remap(struct dst_request *req) > +{ > + int (*remap[])(struct dst_request *) = > + {&dst_mirror_read, &dst_mirror_write}; > + > + return remap[bio_rw(req->bio) == WRITE](req); > +} > + > +static int dst_mirror_error(struct kst_state *st, int err) > +{ > + struct dst_request *req, *tmp; > + unsigned int revents = st->socket->ops->poll(NULL, st->socket, NULL); > + > + if (err == -EEXIST) > + return err; > + > + if (!(revents & (POLLERR | POLLHUP))) { > + if (test_bit(DST_NODE_NOTSYNC, &st->node->flags)) { > + return dst_mirror_resync(st->node); > + } > + return 0; > + } > + > + dst_mirror_mark_notsync(st->node); > + > + mutex_lock(&st->request_lock); > + list_for_each_entry_safe(req, tmp, &st->request_list, > + request_list_entry) { > + kst_del_req(req); > + dprintk("%s: requeue [%c], start: %llu, idx: %d," > + " num: %d, size: %llu, offset: %u, err: %d.\n", > + __func__, (bio_rw(req->bio) == WRITE)?'W':'R', > + req->start, req->idx, req->num, req->size, > + req->offset, err); > + > + if (bio_rw(req->bio) == READ) { > + req->start -= to_sector(req->orig_size - req->size); > + req->size = req->orig_size; > + req->flags &= ~DST_REQ_HEADER_SENT; > + req->idx = 0; > + if (dst_mirror_read(req)) > + kst_complete_req(req, err); > + else > + dst_free_request(req); > + } else { > + kst_complete_req(req, err); > + } > + } > + mutex_unlock(&st->request_lock); > + return err; > +} > + > +static struct dst_alg_ops alg_mirror_ops = { > + .remap = dst_mirror_remap, > + .add_node = dst_mirror_add_node, > + .del_node = dst_mirror_del_node, > + .error = dst_mirror_error, > + .owner = THIS_MODULE, > +}; > + > +static int __devinit alg_mirror_init(void) > +{ > + int err = -ENOMEM; > + > + dst_mirror_bio_set = bioset_create(256, 256); > + if (!dst_mirror_bio_set) > + return -ENOMEM; > + > + alg_mirror = dst_alloc_alg("alg_mirror", &alg_mirror_ops); > + if (!alg_mirror) > + goto err_out; > + > + return 0; > + > +err_out: > + bioset_free(dst_mirror_bio_set); > + return err; > +} > + > +static void __devexit alg_mirror_exit(void) > +{ > + dst_remove_alg(alg_mirror); > + bioset_free(dst_mirror_bio_set); > +} > + > +module_init(alg_mirror_init); > +module_exit(alg_mirror_exit); > + > +MODULE_LICENSE("GPL"); > +MODULE_AUTHOR("Evgeniy Polyakov "); > +MODULE_DESCRIPTION("Mirror distributed algorithm."); > diff --git a/drivers/block/dst/dcore.c b/drivers/block/dst/dcore.c > new file mode 100644 > index 0000000..2bf7fc1 > --- /dev/null > +++ b/drivers/block/dst/dcore.c > @@ -0,0 +1,1526 @@ > +/* > + * 2007+ Copyright (c) Evgeniy Polyakov > + * All rights reserved. > + * > + * This program is free software; you can redistribute it and/or modify > + * it under the terms of the GNU General Public License as published by > + * the Free Software Foundation; either version 2 of the License, or > + * (at your option) any later version. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU General Public License for more details. > + */ > + > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > + > +#include > + > +static LIST_HEAD(dst_storage_list); > +static LIST_HEAD(dst_alg_list); > +static DEFINE_MUTEX(dst_storage_lock); > +static DEFINE_MUTEX(dst_alg_lock); > +static int dst_major; > +static struct kst_worker *kst_main_worker; > + > +struct kmem_cache *dst_request_cache; > + > +/* > + * DST sysfs tree. For device called 'storage' which is formed > + * on top of two nodes this looks like this: > + * > + * /sys/devices/storage/ > + * /sys/devices/storage/alg : alg_linear > + * /sys/devices/storage/n-800/type : R: 192.168.4.80:1025 > + * /sys/devices/storage/n-800/size : 800 > + * /sys/devices/storage/n-800/start : 800 > + * /sys/devices/storage/n-0/type : R: 192.168.4.81:1025 > + * /sys/devices/storage/n-0/size : 800 > + * /sys/devices/storage/n-0/start : 0 > + * /sys/devices/storage/remove_all_nodes > + * /sys/devices/storage/nodes : sectors (start [size]): 0 [800] | 800 [800] > + * /sys/devices/storage/name : storage > + */ > + > +static int dst_dev_match(struct device *dev, struct device_driver *drv) > +{ > + return 1; > +} > + > +static void dst_dev_release(struct device *dev) > +{ > +} > + > +static struct bus_type dst_dev_bus_type = { > + .name = "dst", > + .match = &dst_dev_match, > +}; > + > +static struct device dst_dev = { > + .bus = &dst_dev_bus_type, > + .release = &dst_dev_release > +}; > + > +static void dst_node_release(struct device *dev) > +{ > +} > + > +static struct device dst_node_dev = { > + .release = &dst_node_release > +}; > + > +static struct bio_set *dst_bio_set; > + > +static void dst_destructor(struct bio *bio) > +{ > + bio_free(bio, dst_bio_set); > +} > + > +/* > + * Internal callback for local requests (i.e. for local disk), > + * which are splitted between nodes (part with local node destination > + * ends up with this ->bi_end_io() callback). > + */ > +static int dst_end_io(struct bio *bio, unsigned int size, int err) > +{ > + struct bio *orig_bio = bio->bi_private; > + > + if (bio->bi_size) > + return 0; > + > + dprintk("%s: bio: %p, orig_bio: %p, size: %u, orig_size: %u.\n", > + __func__, bio, orig_bio, size, orig_bio->bi_size); > + > + bio_endio(orig_bio, size, 0); > + bio_put(bio); > + return 0; > +} > + > +/* > + * This function sends processing request down to block layer (for local node) > + * or to network state machine (for remote node). > + */ > +static int dst_node_push(struct dst_request *req) > +{ > + int err = 0; > + struct dst_node *n = req->node; > + > + if (n->bdev) { > + struct bio *bio = req->bio; > + > + dprintk("%s: start: %llu, num: %d, idx: %d, offset: %u, " > + "size: %llu, bi_idx: %d, bi_vcnt: %d.\n", > + __func__, req->start, req->num, req->idx, > + req->offset, req->size, bio->bi_idx, bio->bi_vcnt); > + > + if (likely(bio->bi_idx == req->idx && > + bio->bi_vcnt == req->num)) { > + bio->bi_bdev = n->bdev; > + bio->bi_sector = req->start; > + } else { > + struct bio *clone = bio_alloc_bioset(GFP_NOIO, > + bio->bi_max_vecs, dst_bio_set); > + struct bio_vec *bv; > + > + err = -ENOMEM; > + if (!clone) > + goto out_put; > + > + __bio_clone(clone, bio); > + > + bv = bio_iovec_idx(clone, req->idx); > + bv->bv_offset += req->offset; > + clone->bi_idx = req->idx; > + clone->bi_vcnt = req->num; > + clone->bi_bdev = n->bdev; > + clone->bi_sector = req->start; > + clone->bi_destructor = dst_destructor; > + clone->bi_private = bio; > + clone->bi_size = req->orig_size; > + clone->bi_end_io = &dst_end_io; > + req->bio = clone; > + > + dprintk("%s: start: %llu, num: %d, idx: %d, " > + "offset: %u, size: %llu, " > + "bi_idx: %d, bi_vcnt: %d, req: %p, bio: %p.\n", > + __func__, req->start, req->num, req->idx, > + req->offset, req->size, > + clone->bi_idx, clone->bi_vcnt, req, req->bio); > + > + } > + } > + > + err = n->st->alg->ops->remap(req); > + > +out_put: > + dst_node_put(n); > + return err; > +} > + > +/* > + * This function is invoked from block layer request processing function, > + * its task is to remap block request to different nodes. > + */ > +static int dst_remap(struct dst_storage *st, struct bio *bio) > +{ > + struct dst_node *n; > + int err = -EINVAL, i, cnt; > + unsigned int bio_sectors = bio->bi_size>>9; > + struct bio_vec *bv; > + struct dst_request req; > + u64 rest_in_node, start, total_size; > + > + mutex_lock(&st->tree_lock); > + n = dst_storage_tree_search(st, bio->bi_sector); > + mutex_unlock(&st->tree_lock); > + > + if (!n) { > + dprintk("%s: failed to find a node for bio: %p, " > + "sector: %llu.\n", > + __func__, bio, bio->bi_sector); > + return -ENODEV; > + } > + > + dprintk("%s: bio: %llu-%llu, dev: %llu-%llu, in sectors.\n", > + __func__, bio->bi_sector, bio->bi_sector+bio_sectors, > + n->start, n->start+n->size); > + > + memset(&req, 0, sizeof(struct dst_request)); > + > + start = bio->bi_sector; > + total_size = bio->bi_size; > + > + req.flags = (test_bit(DST_NODE_FROZEN, &n->flags))? > + DST_REQ_ALWAYS_QUEUE:0; > + req.start = start - n->start; > + req.offset = 0; > + req.state = n->state; > + req.node = n; > + req.bio = bio; > + > + req.size = bio->bi_size; > + req.orig_size = bio->bi_size; > + req.idx = bio->bi_idx; > + req.num = bio->bi_vcnt; > + > + req.bio_endio = &kst_bio_endio; > + > + /* > + * Common fast path - block request does not cross > + * boundaries between nodes. > + */ > + if (likely(bio->bi_sector + bio_sectors <= n->start + n->size)) > + return dst_node_push(&req); > + > + req.size = 0; > + req.idx = 0; > + req.num = 1; > + > + cnt = bio->bi_vcnt; > + > + rest_in_node = to_bytes(n->size - req.start); > + > + for (i = 0; i < cnt; ++i) { > + bv = bio_iovec_idx(bio, i); > + > + if (req.size + bv->bv_len >= rest_in_node) { > + unsigned int diff = req.size + bv->bv_len - > + rest_in_node; > + > + req.size += bv->bv_len - diff; > + req.start = start - n->start; > + req.orig_size = req.size; > + req.bio = bio; > + req.bio_endio = &kst_bio_endio; > + > + dprintk("%s: split: start: %llu/%llu, size: %llu, " > + "total_size: %llu, diff: %u, idx: %d, " > + "num: %d, bv_len: %u, bv_offset: %u.\n", > + __func__, start, req.start, req.size, > + total_size, diff, req.idx, req.num, > + bv->bv_len, bv->bv_offset); > + > + err = dst_node_push(&req); > + if (err) > + break; > + > + total_size -= req.orig_size; > + > + if (!total_size) > + break; > + > + start += to_sector(req.orig_size); > + > + req.flags = (test_bit(DST_NODE_FROZEN, &n->flags))? > + DST_REQ_ALWAYS_QUEUE:0; > + req.orig_size = req.size = diff; > + > + if (diff) { > + req.offset = bv->bv_len - diff; > + req.idx = req.num - 1; > + } else { > + req.idx = req.num; > + req.offset = 0; > + } > + > + dprintk("%s: next: start: %llu, size: %llu, " > + "total_size: %llu, diff: %u, idx: %d, " > + "num: %d, offset: %u, bv_len: %u, " > + "bv_offset: %u.\n", > + __func__, start, req.size, total_size, diff, > + req.idx, req.num, req.offset, > + bv->bv_len, bv->bv_offset); > + > + mutex_lock(&st->tree_lock); > + n = dst_storage_tree_search(st, start); > + mutex_unlock(&st->tree_lock); > + > + if (!n) { > + err = -ENODEV; > + dprintk("%s: failed to find a split node for " > + "bio: %p, sector: %llu, start: %llu.\n", > + __func__, bio, bio->bi_sector, > + req.start); > + break; > + } > + > + req.state = n->state; > + req.node = n; > + req.start = start - n->start; > + rest_in_node = to_bytes(n->size - req.start); > + > + dprintk("%s: req.start: %llu, start: %llu, " > + "dev_start: %llu, dev_size: %llu, " > + "rest_in_node: %llu.\n", > + __func__, req.start, start, n->start, > + n->size, rest_in_node); > + } else { > + req.size += bv->bv_len; > + req.num++; > + } > + } > + > + dprintk("%s: last request: start: %llu, size: %llu, " > + "total_size: %llu.\n", __func__, > + req.start, req.size, total_size); > + if (total_size) { > + req.orig_size = req.size; > + req.bio = bio; > + req.bio_endio = &kst_bio_endio; > + > + dprintk("%s: last: start: %llu/%llu, size: %llu, " > + "total_size: %llu, idx: %d, num: %d.\n", > + __func__, start, req.start, req.size, > + total_size, req.idx, req.num); > + > + err = dst_node_push(&req); > + if (!err) { > + total_size -= req.orig_size; > + > + BUG_ON(total_size != 0); > + } > + } > + > + dprintk("%s: end bio: %p, err: %d.\n", __func__, bio, err); > + return err; > +} > + > + > +/* > + * Distributed storage erquest processing function. > + * It calls algorithm spcific remapping code only. > + */ > +static int dst_request(request_queue_t *q, struct bio *bio) > +{ > + struct dst_storage *st = q->queuedata; > + int err; > + > + dprintk("\n%s: start: st: %p, bio: %p, cnt: %u.\n", > + __func__, st, bio, bio->bi_vcnt); > + > + err = dst_remap(st, bio); > + > + dprintk("%s: end: st: %p, bio: %p, err: %d.\n", > + __func__, st, bio, err); > + return 0; > +} > + > +static void dst_unplug(request_queue_t *q) > +{ > +} > + > +static int dst_flush(request_queue_t *q, struct gendisk *disk, sector_t *sec) > +{ > + return 0; > +} > + > +static struct block_device_operations dst_blk_ops = { > + .owner = THIS_MODULE, > +}; > + > +/* > + * Block layer binding - disk is created when array is fully configured > + * by userspace request. > + */ > +static int dst_create_disk(struct dst_storage *st) > +{ > + int err = -ENOMEM; > + > + st->queue = blk_alloc_queue(GFP_KERNEL); > + if (!st->queue) > + goto err_out_exit; > + > + st->queue->queuedata = st; > + blk_queue_make_request(st->queue, dst_request); > + blk_queue_bounce_limit(st->queue, BLK_BOUNCE_ANY); > + st->queue->unplug_fn = dst_unplug; > + st->queue->issue_flush_fn = dst_flush; > + > + err = -EINVAL; > + st->disk = alloc_disk(1); > + if (!st->disk) > + goto err_out_free_queue; > + > + st->disk->major = dst_major; > + st->disk->first_minor = (((unsigned long)st->disk) ^ > + (((unsigned long)st->disk) >> 31)) & 0xff; > + st->disk->fops = &dst_blk_ops; > + st->disk->queue = st->queue; > + st->disk->private_data = st; > + snprintf(st->disk->disk_name, sizeof(st->disk->disk_name), > + "dst-%s-%d", st->name, st->disk->first_minor); > + > + return 0; > + > +err_out_free_queue: > + blk_cleanup_queue(st->queue); > +err_out_exit: > + return err; > +} > + > +static void dst_remove_disk(struct dst_storage *st) > +{ > + del_gendisk(st->disk); > + put_disk(st->disk); > + blk_cleanup_queue(st->queue); > +} > + > +/* > + * Shows node name in sysfs. > + */ > +static ssize_t dst_name_show(struct device *dev, > + struct device_attribute *attr, char *buf) > +{ > + struct dst_storage *st = container_of(dev, struct dst_storage, device); > + > + return sprintf(buf, "%s\n", st->name); > +} > + > +static void dst_remove_all_nodes(struct dst_storage *st) > +{ > + struct dst_node *n, *node, *tmp; > + struct rb_node *rb_node; > + > + mutex_lock(&st->tree_lock); > + while ((rb_node = rb_first(&st->tree_root)) != NULL) { > + n = rb_entry(rb_node, struct dst_node, tree_node); > + dprintk("%s: n: %p, start: %llu, size: %llu.\n", > + __func__, n, n->start, n->size); > + rb_erase(&n->tree_node, &st->tree_root); > + if (!n->shared_head && atomic_read(&n->shared_num)) { > + list_for_each_entry_safe(node, tmp, &n->shared, shared) { > + list_del_rcu(&node->shared); Under the update-side mutex, so OK. > + atomic_dec(&node->shared_head->refcnt); > + node->shared_head = NULL; > + dst_node_put(node); > + } > + } > + dst_node_put(n); > + } > + mutex_unlock(&st->tree_lock); > +} > + > +/* > + * Shows node layout in syfs. > + */ > +static ssize_t dst_nodes_show(struct device *dev, > + struct device_attribute *attr, char *buf) > +{ > + struct dst_storage *st = container_of(dev, struct dst_storage, device); > + int size = PAGE_CACHE_SIZE, sz; > + struct dst_node *n; > + struct rb_node *rb_node; > + > + sz = sprintf(buf, "sectors (start [size]): "); > + size -= sz; > + buf += sz; > + > + mutex_lock(&st->tree_lock); > + for (rb_node = rb_first(&st->tree_root); rb_node; > + rb_node = rb_next(rb_node)) { > + n = rb_entry(rb_node, struct dst_node, tree_node); > + if (size < 32) > + break; > + sz = sprintf(buf, "%llu [%llu]", n->start, n->size); > + buf += sz; > + size -= sz; > + > + if (!rb_next(rb_node)) > + break; > + > + sz = sprintf(buf, " | "); > + buf += sz; > + size -= sz; > + } > + mutex_unlock(&st->tree_lock); > + size -= sprintf(buf, "\n"); > + return PAGE_CACHE_SIZE - size; > +} > + > +/* > + * Algorithm currently being used by given storage. > + */ > +static ssize_t dst_alg_show(struct device *dev, > + struct device_attribute *attr, char *buf) > +{ > + struct dst_storage *st = container_of(dev, struct dst_storage, device); > + return sprintf(buf, "%s\n", st->alg->name); > +} > + > +/* > + * Writing to this sysfs file allows to remove all nodes > + * and storage itself automatically. > + */ > +static ssize_t dst_remove_nodes(struct device *dev, > + struct device_attribute *attr, > + const char *buf, size_t count) > +{ > + struct dst_storage *st = container_of(dev, struct dst_storage, device); > + dst_remove_all_nodes(st); > + return count; > +} > + > +static DEVICE_ATTR(name, 0444, dst_name_show, NULL); > +static DEVICE_ATTR(nodes, 0444, dst_nodes_show, NULL); > +static DEVICE_ATTR(alg, 0444, dst_alg_show, NULL); > +static DEVICE_ATTR(remove_all_nodes, 0644, NULL, dst_remove_nodes); > + > +static int dst_create_storage_attributes(struct dst_storage *st) > +{ > + int err; > + > + err = device_create_file(&st->device, &dev_attr_name); > + err = device_create_file(&st->device, &dev_attr_nodes); > + err = device_create_file(&st->device, &dev_attr_alg); > + err = device_create_file(&st->device, &dev_attr_remove_all_nodes); > + return 0; > +} > + > +static void dst_remove_storage_attributes(struct dst_storage *st) > +{ > + device_remove_file(&st->device, &dev_attr_name); > + device_remove_file(&st->device, &dev_attr_nodes); > + device_remove_file(&st->device, &dev_attr_alg); > + device_remove_file(&st->device, &dev_attr_remove_all_nodes); > +} > + > +static void dst_storage_sysfs_exit(struct dst_storage *st) > +{ > + dst_remove_storage_attributes(st); > + device_unregister(&st->device); > +} > + > +static int dst_storage_sysfs_init(struct dst_storage *st) > +{ > + int err; > + > + memcpy(&st->device, &dst_dev, sizeof(struct device)); > + snprintf(st->device.bus_id, sizeof(st->device.bus_id), "%s", st->name); > + > + err = device_register(&st->device); > + if (err) { > + dprintk(KERN_ERR "Failed to register dst device %s, err: %d.\n", > + st->name, err); > + goto err_out_exit; > + } > + > + dst_create_storage_attributes(st); > + > + return 0; > + > +err_out_exit: > + return err; > +} > + > +/* > + * This functions shows size and start of the appropriate node. > + * Both are in sectors. > + */ > +static ssize_t dst_show_start(struct device *dev, > + struct device_attribute *attr, char *buf) > +{ > + struct dst_node *n = container_of(dev, struct dst_node, device); > + > + return sprintf(buf, "%llu\n", n->start); > +} > + > +static ssize_t dst_show_size(struct device *dev, > + struct device_attribute *attr, char *buf) > +{ > + struct dst_node *n = container_of(dev, struct dst_node, device); > + > + return sprintf(buf, "%llu\n", n->size); > +} > + > +/* > + * Shows type of the remote node - device major/minor number > + * for local nodes and address (af_inet ipv4/ipv6 only) for remote nodes. > + */ > +static ssize_t dst_show_type(struct device *dev, > + struct device_attribute *attr, char *buf) > +{ > + struct dst_node *n = container_of(dev, struct dst_node, device); > + struct sockaddr addr; > + struct socket *sock; > + int addrlen; > + > + if (!n->state && !n->bdev) > + return 0; > + > + if (n->bdev) > + return sprintf(buf, "L: %d:%d\n", > + MAJOR(n->bdev->bd_dev), MINOR(n->bdev->bd_dev)); > + > + sock = n->state->socket; > + if (sock->ops->getname(sock, &addr, &addrlen, 2)) > + return 0; > + > + if (sock->ops->family == AF_INET) { > + struct sockaddr_in *sin = (struct sockaddr_in *)&addr; > + return sprintf(buf, "R: %u.%u.%u.%u:%d\n", > + NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port)); > + } else if (sock->ops->family == AF_INET6) { > + struct sockaddr_in6 *sin = (struct sockaddr_in6 *)&addr; > + return sprintf(buf, > + "R: %04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d\n", > + NIP6(sin->sin6_addr), ntohs(sin->sin6_port)); > + } > + return 0; > +} > + > +static DEVICE_ATTR(start, 0444, dst_show_start, NULL); > +static DEVICE_ATTR(size, 0444, dst_show_size, NULL); > +static DEVICE_ATTR(type, 0444, dst_show_type, NULL); > + > +static int dst_create_node_attributes(struct dst_node *n) > +{ > + int err; > + > + err = device_create_file(&n->device, &dev_attr_start); > + err = device_create_file(&n->device, &dev_attr_size); > + err = device_create_file(&n->device, &dev_attr_type); > + return 0; > +} > + > +static void dst_remove_node_attributes(struct dst_node *n) > +{ > + device_remove_file(&n->device, &dev_attr_start); > + device_remove_file(&n->device, &dev_attr_size); > + device_remove_file(&n->device, &dev_attr_type); > +} > + > +static void dst_node_sysfs_exit(struct dst_node *n) > +{ > + if (n->device.parent == &n->st->device) { > + dst_remove_node_attributes(n); > + device_unregister(&n->device); > + n->device.parent = NULL; > + } > +} > + > +static int dst_node_sysfs_init(struct dst_node *n) > +{ > + int err; > + > + memcpy(&n->device, &dst_node_dev, sizeof(struct device)); > + > + n->device.parent = &n->st->device; > + > + snprintf(n->device.bus_id, sizeof(n->device.bus_id), > + "n-%llu-%p", n->start, n); > + err = device_register(&n->device); > + if (err) { > + dprintk(KERN_ERR "Failed to register node, err: %d.\n", err); > + goto err_out_exit; > + } > + > + dst_create_node_attributes(n); > + > + return 0; > + > +err_out_exit: > + n->device.parent = NULL; > + return err; > +} > + > +/* > + * Gets a reference for given storage, if > + * storage with given name and algorithm being used > + * does not exist it is created. > + */ > +static struct dst_storage *dst_get_storage(char *name, char *aname, int alloc) > +{ > + struct dst_storage *st, *rst = NULL; > + int err; > + struct dst_alg *alg; > + > + mutex_lock(&dst_storage_lock); > + list_for_each_entry(st, &dst_storage_list, entry) { > + if (!strcmp(name, st->name) && !strcmp(st->alg->name, aname)) { > + rst = st; > + atomic_inc(&st->refcnt); > + break; > + } > + } > + mutex_unlock(&dst_storage_lock); > + > + if (rst || !alloc) > + return rst; > + > + st = kzalloc(sizeof(struct dst_storage), GFP_KERNEL); > + if (!st) > + return NULL; > + > + mutex_init(&st->tree_lock); > + /* > + * One for storage itself, > + * another one for attached node below. > + */ > + atomic_set(&st->refcnt, 2); > + snprintf(st->name, DST_NAMELEN, "%s", name); > + st->tree_root.rb_node = NULL; > + > + err = dst_storage_sysfs_init(st); > + if (err) > + goto err_out_free; > + > + err = dst_create_disk(st); > + if (err) > + goto err_out_sysfs_exit; > + > + mutex_lock(&dst_alg_lock); > + list_for_each_entry(alg, &dst_alg_list, entry) { > + if (!strcmp(alg->name, aname)) { > + atomic_inc(&alg->refcnt); > + try_module_get(alg->ops->owner); > + st->alg = alg; > + break; > + } > + } > + mutex_unlock(&dst_alg_lock); > + > + if (!st->alg) > + goto err_out_disk_remove; > + > + mutex_lock(&dst_storage_lock); > + list_add_tail(&st->entry, &dst_storage_list); > + mutex_unlock(&dst_storage_lock); > + > + return st; > + > +err_out_disk_remove: > + dst_remove_disk(st); > +err_out_sysfs_exit: > + dst_storage_sysfs_init(st); > +err_out_free: > + kfree(st); > + return NULL; > +} > + > +/* > + * Allows to allocate and add new algorithm by external modules. > + */ > +struct dst_alg *dst_alloc_alg(char *name, struct dst_alg_ops *ops) > +{ > + struct dst_alg *alg; > + > + alg = kzalloc(sizeof(struct dst_alg), GFP_KERNEL); > + if (!alg) > + return NULL; > + snprintf(alg->name, DST_NAMELEN, "%s", name); > + atomic_set(&alg->refcnt, 1); > + alg->ops = ops; > + > + mutex_lock(&dst_alg_lock); > + list_add_tail(&alg->entry, &dst_alg_list); > + mutex_unlock(&dst_alg_lock); > + > + return alg; > +} > +EXPORT_SYMBOL_GPL(dst_alloc_alg); > + > +static void dst_free_alg(struct dst_alg *alg) > +{ > + dprintk("%s: alg: %p.\n", __func__, alg); > + kfree(alg); > +} > + > +/* > + * Algorithm is never freed directly, > + * since its module reference counter is increased > + * by storage when it is created - just like network protocols. > + */ > +static inline void dst_put_alg(struct dst_alg *alg) > +{ > + dprintk("%s: alg: %p, refcnt: %d.\n", > + __func__, alg, atomic_read(&alg->refcnt)); > + module_put(alg->ops->owner); > + if (atomic_dec_and_test(&alg->refcnt)) > + dst_free_alg(alg); > +} > + > +/* > + * Removing algorithm from main list of supported algorithms. > + */ > +void dst_remove_alg(struct dst_alg *alg) > +{ > + mutex_lock(&dst_alg_lock); > + list_del_init(&alg->entry); > + mutex_unlock(&dst_alg_lock); > + > + dst_put_alg(alg); > +} > +EXPORT_SYMBOL_GPL(dst_remove_alg); > + > +static void dst_cleanup_node(struct dst_node *n) > +{ > + struct dst_storage *st = n->st; > + > + dprintk("%s: node: %p.\n", __func__, n); > + > + n->st->alg->ops->del_node(n); > + > + if (n->shared_head) { > + mutex_lock(&st->tree_lock); > + list_del_rcu(&n->shared); Under the update-side mutex, so OK. > + mutex_unlock(&st->tree_lock); > + > + atomic_dec(&n->shared_head->refcnt); > + dst_node_put(n->shared_head); > + n->shared_head = NULL; > + } > + > + if (n->cleanup) > + n->cleanup(n); > + dst_node_sysfs_exit(n); > + kfree(n); > +} > + > +static void dst_free_storage(struct dst_storage *st) > +{ > + dprintk("%s: st: %p.\n", __func__, st); > + > + BUG_ON(rb_first(&st->tree_root) != NULL); > + > + dst_put_alg(st->alg); > + kfree(st); > +} > + > +static inline void dst_put_storage(struct dst_storage *st) > +{ > + dprintk("%s: st: %p, refcnt: %d.\n", > + __func__, st, atomic_read(&st->refcnt)); > + if (atomic_dec_and_test(&st->refcnt)) > + dst_free_storage(st); > +} > + > +void dst_node_put(struct dst_node *n) > +{ > + dprintk("%s: node: %p, start: %llu, size: %llu, refcnt: %d.\n", > + __func__, n, n->start, n->size, > + atomic_read(&n->refcnt)); > + > + if (atomic_dec_and_test(&n->refcnt)) { > + struct dst_storage *st = n->st; > + > + dprintk("%s: freeing node: %p, start: %llu, size: %llu, " > + "refcnt: %d.\n", > + __func__, n, n->start, n->size, > + atomic_read(&n->refcnt)); > + > + dst_cleanup_node(n); > + dst_put_storage(st); > + } > +} > +EXPORT_SYMBOL_GPL(dst_node_put); > + > +static inline int dst_compare_id(struct dst_node *old, u64 new) > +{ > + if (old->start + old->size <= new) > + return 1; > + if (old->start > new) > + return -1; > + return 0; > +} > + > +/* > + * Tree of of the nodes, which form the storage. > + * Tree is indexed via start of the node and its size. > + * Comparison function above. > + */ > +struct dst_node *dst_storage_tree_search(struct dst_storage *st, u64 start) > +{ > + struct rb_node *n = st->tree_root.rb_node; > + struct dst_node *dn; > + int cmp; > + > + while (n) { > + dn = rb_entry(n, struct dst_node, tree_node); > + > + cmp = dst_compare_id(dn, start); > + dprintk("%s: tree: %llu-%llu, new: %llu.\n", > + __func__, dn->start, dn->start+dn->size, start); > + if (cmp < 0) > + n = n->rb_left; > + else if (cmp > 0) > + n = n->rb_right; > + else { > + return dst_node_get(dn); > + } > + } > + return NULL; > +} > +EXPORT_SYMBOL_GPL(dst_storage_tree_search); > + > +/* > + * This function allows to remove a node with given start address > + * from the storage. > + */ > +static struct dst_node *dst_storage_tree_del(struct dst_storage *st, u64 start) > +{ > + struct dst_node *n = dst_storage_tree_search(st, start); > + > + if (!n) > + return NULL; > + > + rb_erase(&n->tree_node, &st->tree_root); > + dst_node_put(n); > + return n; > +} > + > +/* > + * This function allows to add given node to the storage. > + * Returns -EEXIST if the same area is already covered by another node. > + * This is return must be checked for redundancy algorithms. > + */ > +static struct dst_node *dst_storage_tree_add(struct dst_node *new, > + struct dst_storage *st) > +{ > + struct rb_node **n = &st->tree_root.rb_node, *parent = NULL; > + struct dst_node *dn; > + int cmp; > + > + while (*n) { > + parent = *n; > + dn = rb_entry(parent, struct dst_node, tree_node); > + > + cmp = dst_compare_id(dn, new->start); > + dprintk("%s: tree: %llu-%llu, new: %llu.\n", > + __func__, dn->start, dn->start+dn->size, > + new->start); > + if (cmp < 0) > + n = &parent->rb_left; > + else if (cmp > 0) > + n = &parent->rb_right; > + else { > + return dn; > + } > + } > + > + rb_link_node(&new->tree_node, parent, n); > + rb_insert_color(&new->tree_node, &st->tree_root); > + > + return NULL; > +} > + > +/* > + * This function finds devices major/minor numbers for given pathname. > + */ > +static int dst_lookup_device(const char *path, dev_t *dev) > +{ > + int err; > + struct nameidata nd; > + struct inode *inode; > + > + err = path_lookup(path, LOOKUP_FOLLOW, &nd); > + if (err) > + return err; > + > + inode = nd.dentry->d_inode; > + if (!inode) { > + err = -ENOENT; > + goto out; > + } > + > + if (!S_ISBLK(inode->i_mode)) { > + err = -ENOTBLK; > + goto out; > + } > + > + *dev = inode->i_rdev; > + > +out: > + path_release(&nd); > + return err; > +} > + > +/* > + * Cleanup routings for local, local exporting and remote nodes. > + */ > +static void dst_cleanup_remote(struct dst_node *n) > +{ > + if (n->state) { > + kst_state_exit(n->state); > + n->state = NULL; > + } > +} > + > +static void dst_cleanup_local(struct dst_node *n) > +{ > + if (n->bdev) { > + sync_blockdev(n->bdev); > + blkdev_put(n->bdev); > + n->bdev = NULL; > + } > +} > + > +static void dst_cleanup_local_export(struct dst_node *n) > +{ > + dst_cleanup_local(n); > + dst_cleanup_remote(n); > +} > + > +/* > + * Setup routings for local, local exporting and remote nodes. > + */ > +static int dst_setup_local(struct dst_node *n, struct dst_ctl *ctl, > + struct dst_local_ctl *l) > +{ > + dev_t dev; > + int err; > + > + err = dst_lookup_device(l->name, &dev); > + if (err) > + return err; > + > + n->bdev = open_by_devnum(dev, FMODE_READ|FMODE_WRITE); > + if (!n->bdev) > + return -ENODEV; > + > + if (!n->size) > + n->size = get_capacity(n->bdev->bd_disk); > + > + return 0; > +} > + > +static int dst_setup_local_export(struct dst_node *n, struct dst_ctl *ctl, > + struct dst_le_template *tmp) > +{ > + int err; > + > + err = dst_setup_local(n, ctl, &tmp->le.lctl); > + if (err) > + goto err_out_exit; > + > + n->state = kst_listener_state_init(n, tmp); > + if (IS_ERR(n->state)) { > + err = PTR_ERR(n->state); > + goto err_out_cleanup; > + } > + > + return 0; > + > +err_out_cleanup: > + dst_cleanup_local(n); > +err_out_exit: > + return err; > +} > + > +static int dst_request_remote_config(struct dst_node *n, struct socket *sock) > +{ > + struct dst_remote_request cfg; > + struct msghdr msg; > + struct kvec iov; > + int err; > + > + memset(&cfg, 0, sizeof(struct dst_remote_request)); > + cfg.cmd = cpu_to_be32(DST_REMOTE_CFG); > + > + iov.iov_base = &cfg; > + iov.iov_len = sizeof(struct dst_remote_request); > + > + msg.msg_iov = (struct iovec *)&iov; > + msg.msg_iovlen = 1; > + msg.msg_name = NULL; > + msg.msg_namelen = 0; > + msg.msg_control = NULL; > + msg.msg_controllen = 0; > + msg.msg_flags = MSG_WAITALL; > + > + err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); > + if (err <= 0) { > + if (err == 0) > + err = -ECONNRESET; > + return err; > + } > + > + iov.iov_base = &cfg; > + iov.iov_len = sizeof(struct dst_remote_request); > + > + msg.msg_iov = (struct iovec *)&iov; > + msg.msg_iovlen = 1; > + msg.msg_name = NULL; > + msg.msg_namelen = 0; > + msg.msg_control = NULL; > + msg.msg_controllen = 0; > + msg.msg_flags = MSG_WAITALL; > + > + err = kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags); > + if (err <= 0) { > + if (err == 0) > + err = -ECONNRESET; > + return err; > + } > + > + if (be32_to_cpu(cfg.cmd) != DST_REMOTE_CFG) > + return -EINVAL; > + > + n->size = be64_to_cpu(cfg.sector); > + > + return 0; > +} > + > +static int dst_setup_remote(struct dst_node *n, struct dst_ctl *ctl, > + struct dst_remote_ctl *r) > +{ > + int err; > + struct socket *sock; > + > + err = sock_create(r->addr.sa_family, r->type, r->proto, &sock); > + if (err < 0) > + goto err_out_exit; > + > + sock->sk->sk_sndtimeo = sock->sk->sk_rcvtimeo = > + msecs_to_jiffies(DST_DEFAULT_TIMEO); > + > + err = sock->ops->connect(sock, (struct sockaddr *)&r->addr, > + r->addr.sa_data_len, 0); > + if (err) > + goto err_out_destroy; > + > + if (!n->size) { > + err = dst_request_remote_config(n, sock); > + if (err) > + goto err_out_destroy; > + } > + > + n->state = kst_data_state_init(n, sock); > + if (IS_ERR(n->state)) { > + err = PTR_ERR(n->state); > + goto err_out_destroy; > + } > + > + return 0; > + > +err_out_destroy: > + sock_release(sock); > +err_out_exit: > + return err; > +} > + > +/* > + * This function inserts node into storage. > + */ > +static int dst_insert_node(struct dst_node *n) > +{ > + int err; > + struct dst_storage *st = n->st; > + struct dst_node *dn; > + > + err = st->alg->ops->add_node(n); > + if (err) > + return err; > + > + err = dst_node_sysfs_init(n); > + if (err) > + goto err_out_remove_node; > + > + mutex_lock(&st->tree_lock); > + dn = dst_storage_tree_add(n, st); > + if (dn) { > + err = -EINVAL; > + dn->size = st->disk_size; > + if (dn->start == n->start) { > + err = 0; > + n->shared_head = dst_node_get(dn); > + atomic_inc(&dn->shared_num); > + list_add_tail_rcu(&n->shared, &dn->shared); And this too is under the update-side mutex, so is OK. > + } > + } > + mutex_unlock(&st->tree_lock); > + if (err) > + goto err_out_sysfs_exit; > + > + if (n->priv_callback) > + n->priv_callback(n); > + > + return 0; > + > +err_out_sysfs_exit: > + dst_node_sysfs_exit(n); > +err_out_remove_node: > + st->alg->ops->del_node(n); > + return err; > +} > + > +static struct dst_node *dst_alloc_node(struct dst_ctl *ctl, > + void (*cleanup)(struct dst_node *)) > +{ > + struct dst_storage *st; > + struct dst_node *n; > + > + st = dst_get_storage(ctl->st, ctl->alg, 1); > + if (!st) > + goto err_out_exit; > + > + n = kzalloc(sizeof(struct dst_node), GFP_KERNEL); > + if (!n) > + goto err_out_put_storage; > + > + n->w = kst_main_worker; > + n->st = st; > + n->cleanup = cleanup; > + n->start = ctl->start; > + n->size = ctl->size; > + INIT_LIST_HEAD(&n->shared); > + n->shared_head = NULL; > + atomic_set(&n->shared_num, 0); > + atomic_set(&n->refcnt, 1); > + > + return n; > + > +err_out_put_storage: > + mutex_lock(&dst_storage_lock); > + list_del_init(&st->entry); > + mutex_unlock(&dst_storage_lock); > + > + dst_put_storage(st); > +err_out_exit: > + return NULL; > +} > + > +/* > + * Control callback for userspace commands to setup > + * different nodes and start/stop array. > + */ > +static int dst_add_remote(struct dst_ctl *ctl, void __user *data) > +{ > + struct dst_node *n; > + int err; > + struct dst_remote_ctl rctl; > + > + if (copy_from_user(&rctl, data, sizeof(struct dst_remote_ctl))) > + return -EFAULT; > + > + n = dst_alloc_node(ctl, &dst_cleanup_remote); > + if (!n) > + return -ENOMEM; > + > + err = dst_setup_remote(n, ctl, &rctl); > + if (err < 0) > + goto err_out_free; > + > + err = dst_insert_node(n); > + if (err) > + goto err_out_free; > + > + return 0; > + > +err_out_free: > + dst_node_put(n); > + return err; > +} > + > +static int dst_add_local_export(struct dst_ctl *ctl, void __user *data) > +{ > + struct dst_node *n; > + int err; > + struct dst_le_template tmp; > + > + if (copy_from_user(&tmp.le, data, sizeof(struct dst_local_export_ctl))) > + return -EFAULT; > + > + tmp.data = data + sizeof(struct dst_local_export_ctl); > + > + n = dst_alloc_node(ctl, &dst_cleanup_local_export); > + if (!n) > + return -EINVAL; > + > + err = dst_setup_local_export(n, ctl, &tmp); > + if (err < 0) > + goto err_out_free; > + > + err = dst_insert_node(n); > + if (err) > + goto err_out_free; > + > + > + return 0; > + > +err_out_free: > + dst_node_put(n); > + return err; > +} > + > +static int dst_add_local(struct dst_ctl *ctl, void __user *data) > +{ > + struct dst_node *n; > + int err; > + struct dst_local_ctl lctl; > + > + if (copy_from_user(&lctl, data, sizeof(struct dst_local_ctl))) > + return -EFAULT; > + > + n = dst_alloc_node(ctl, &dst_cleanup_local); > + if (!n) > + return -EINVAL; > + > + err = dst_setup_local(n, ctl, &lctl); > + if (err < 0) > + goto err_out_free; > + > + err = dst_insert_node(n); > + if (err) > + goto err_out_free; > + > + return 0; > + > +err_out_free: > + dst_node_put(n); > + return err; > +} > + > +static int dst_del_node(struct dst_ctl *ctl, void __user *data) > +{ > + struct dst_node *n; > + struct dst_storage *st; > + int err = -ENODEV; > + > + st = dst_get_storage(ctl->st, ctl->alg, 0); > + if (!st) > + goto err_out_exit; > + > + mutex_lock(&st->tree_lock); > + n = dst_storage_tree_del(st, ctl->start); > + mutex_unlock(&st->tree_lock); > + if (!n) > + goto err_out_put; > + > + dst_node_put(n); > + dst_put_storage(st); > + > + return 0; > + > +err_out_put: > + dst_put_storage(st); > +err_out_exit: > + return err; > +} > + > +static int dst_start_storage(struct dst_ctl *ctl, void __user *data) > +{ > + struct dst_storage *st; > + > + st = dst_get_storage(ctl->st, ctl->alg, 0); > + if (!st) > + return -ENODEV; > + > + mutex_lock(&st->tree_lock); > + if (!(st->flags & DST_ST_STARTED)) { > + set_capacity(st->disk, st->disk_size); > + add_disk(st->disk); > + st->flags |= DST_ST_STARTED; > + dprintk("%s: STARTED st: %p, disk_size: %llu.\n", > + __func__, st, st->disk_size); > + } > + mutex_unlock(&st->tree_lock); > + > + dst_put_storage(st); > + > + return 0; > +} > + > +static int dst_stop_storage(struct dst_ctl *ctl, void __user *data) > +{ > + struct dst_storage *st; > + > + st = dst_get_storage(ctl->st, ctl->alg, 0); > + if (!st) > + return -ENODEV; > + > + dprintk("%s: STOPPED storage: %s.\n", __func__, st->name); > + > + dst_storage_sysfs_exit(st); > + > + mutex_lock(&dst_storage_lock); > + list_del_init(&st->entry); > + mutex_unlock(&dst_storage_lock); > + > + if (st->flags & DST_ST_STARTED) > + dst_remove_disk(st); > + > + dst_remove_all_nodes(st); > + dst_put_storage(st); /* One reference got above */ > + dst_put_storage(st); /* Another reference set during initialization */ > + > + return 0; > +} > + > +typedef int (*dst_command_func)(struct dst_ctl *ctl, void __user *data); > + > +/* > + * List of userspace commands. > + */ > +static dst_command_func dst_commands[] = { > + [DST_ADD_REMOTE] = &dst_add_remote, > + [DST_ADD_LOCAL] = &dst_add_local, > + [DST_ADD_LOCAL_EXPORT] = &dst_add_local_export, > + [DST_DEL_NODE] = &dst_del_node, > + [DST_START_STORAGE] = &dst_start_storage, > + [DST_STOP_STORAGE] = &dst_stop_storage, > +}; > + > +/* > + * Move to connector for configuration is in TODO list. > + */ > +static int dst_ioctl(struct inode *inode, struct file *file, > + unsigned int command, unsigned long data) > +{ > + struct dst_ctl ctl; > + unsigned int cmd = _IOC_NR(command); > + > + if (!capable(CAP_SYS_ADMIN)) > + return -EACCES; > + > + if (_IOC_TYPE(command) != DST_IOCTL) > + return -ENOTTY; > + > + if (cmd >= DST_CMD_MAX) > + return -EINVAL; > + > + if (copy_from_user(&ctl, (void __user *)data, sizeof(struct dst_ctl))) > + return -EFAULT; > + > + data += sizeof(struct dst_ctl); > + > + return dst_commands[cmd](&ctl, (void __user *)data); > +} > + > +static const struct file_operations dst_fops = { > + .ioctl = dst_ioctl, > + .owner = THIS_MODULE, > +}; > + > +static struct miscdevice dst_misc = { > + .minor = MISC_DYNAMIC_MINOR, > + .name = DST_NAME, > + .fops = &dst_fops > +}; > + > +static int dst_sysfs_init(void) > +{ > + return bus_register(&dst_dev_bus_type); > +} > + > +static void dst_sysfs_exit(void) > +{ > + bus_unregister(&dst_dev_bus_type); > +} > + > +static int __devinit dst_sys_init(void) > +{ > + int err = -ENOMEM; > + > + dst_request_cache = kmem_cache_create("dst", sizeof(struct dst_request), > + 0, 0, NULL, NULL); > + if (!dst_request_cache) > + return -ENOMEM; > + > + dst_bio_set = bioset_create(32, 32); > + if (!dst_bio_set) > + goto err_out_destroy; > + > + err = register_blkdev(dst_major, DST_NAME); > + if (err < 0) > + goto err_out_destroy_bioset; > + if (err) > + dst_major = err; > + > + err = dst_sysfs_init(); > + if (err) > + goto err_out_unregister; > + > + kst_main_worker = kst_worker_init(0); > + if (IS_ERR(kst_main_worker)) { > + err = PTR_ERR(kst_main_worker); > + goto err_out_sysfs_exit; > + } > + > + err = misc_register(&dst_misc); > + if (err) > + goto err_out_worker_exit; > + > + return 0; > + > +err_out_worker_exit: > + kst_worker_exit(kst_main_worker); > +err_out_sysfs_exit: > + dst_sysfs_exit(); > +err_out_unregister: > + unregister_blkdev(dst_major, DST_NAME); > +err_out_destroy_bioset: > + bioset_free(dst_bio_set); > +err_out_destroy: > + kmem_cache_destroy(dst_request_cache); > + return err; > +} > + > +static void __devexit dst_sys_exit(void) > +{ > + misc_deregister(&dst_misc); > + dst_sysfs_exit(); > + unregister_blkdev(dst_major, DST_NAME); > + kst_exit_all(); > + bioset_free(dst_bio_set); > + kmem_cache_destroy(dst_request_cache); > +} > + > +module_init(dst_sys_init); > +module_exit(dst_sys_exit); > + > +MODULE_DESCRIPTION("Distributed storage"); > +MODULE_AUTHOR("Evgeniy Polyakov "); > +MODULE_LICENSE("GPL"); > diff --git a/drivers/block/dst/kst.c b/drivers/block/dst/kst.c > new file mode 100644 > index 0000000..b739402 > --- /dev/null > +++ b/drivers/block/dst/kst.c > @@ -0,0 +1,1609 @@ > +/* > + * 2007+ Copyright (c) Evgeniy Polyakov > + * All rights reserved. > + * > + * This program is free software; you can redistribute it and/or modify > + * it under the terms of the GNU General Public License as published by > + * the Free Software Foundation; either version 2 of the License, or > + * (at your option) any later version. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU General Public License for more details. > + */ > + > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > + > +#include > + > +struct kst_poll_helper > +{ > + poll_table pt; > + struct kst_state *st; > +}; > + > +static LIST_HEAD(kst_worker_list); > +static DEFINE_MUTEX(kst_worker_mutex); > + > +/* > + * This function creates bound socket for local export node. > + */ > +static int kst_sock_create(struct kst_state *st, struct saddr *addr, > + int type, int proto, int backlog) > +{ > + int err; > + > + err = sock_create(addr->sa_family, type, proto, &st->socket); > + if (err) > + goto err_out_exit; > + > + err = st->socket->ops->bind(st->socket, (struct sockaddr *)addr, > + addr->sa_data_len); > + > + err = st->socket->ops->listen(st->socket, backlog); > + if (err) > + goto err_out_release; > + > + st->socket->sk->sk_allocation = GFP_NOIO; > + > + return 0; > + > +err_out_release: > + sock_release(st->socket); > +err_out_exit: > + return err; > +} > + > +static void kst_sock_release(struct kst_state *st) > +{ > + if (st->socket) { > + sock_release(st->socket); > + st->socket = NULL; > + } > +} > + > +void kst_wake(struct kst_state *st) > +{ > + struct kst_worker *w = st->node->w; > + unsigned long flags; > + > + spin_lock_irqsave(&w->ready_lock, flags); > + if (list_empty(&st->ready_entry)) > + list_add_tail(&st->ready_entry, &w->ready_list); > + spin_unlock_irqrestore(&w->ready_lock, flags); > + > + wake_up(&w->wait); > +} > +EXPORT_SYMBOL_GPL(kst_wake); > + > +/* > + * Polling machinery. > + */ > +static int kst_state_wake_callback(wait_queue_t *wait, unsigned mode, > + int sync, void *key) > +{ > + struct kst_state *st = container_of(wait, struct kst_state, wait); > + kst_wake(st); > + return 1; > +} > + > +static void kst_queue_func(struct file *file, wait_queue_head_t *whead, > + poll_table *pt) > +{ > + struct kst_state *st = container_of(pt, struct kst_poll_helper, pt)->st; > + > + st->whead = whead; > + init_waitqueue_func_entry(&st->wait, kst_state_wake_callback); > + add_wait_queue(whead, &st->wait); > +} > + > +static void kst_poll_exit(struct kst_state *st) > +{ > + if (st->whead) { > + remove_wait_queue(st->whead, &st->wait); > + st->whead = NULL; > + } > +} > + > +/* > + * This function removes request from state tree and ordering list. > + */ > +void kst_del_req(struct dst_request *req) > +{ > + struct kst_state *st = req->state; > + > + rb_erase(&req->request_entry, &st->request_root); > + RB_CLEAR_NODE(&req->request_entry); > + list_del_init(&req->request_list_entry); > +} > +EXPORT_SYMBOL_GPL(kst_del_req); > + > +static struct dst_request *kst_req_first(struct kst_state *st) > +{ > + struct dst_request *req = NULL; > + > + if (!list_empty(&st->request_list)) > + req = list_entry(st->request_list.next, struct dst_request, > + request_list_entry); > + return req; > +} > + > +/* > + * This function dequeues first request from the queue and tree. > + */ > +static struct dst_request *kst_dequeue_req(struct kst_state *st) > +{ > + struct dst_request *req; > + > + mutex_lock(&st->request_lock); > + req = kst_req_first(st); > + if (req) > + kst_del_req(req); > + mutex_unlock(&st->request_lock); > + return req; > +} > + > +static inline int dst_compare_request_id(struct dst_request *old, > + struct dst_request *new) > +{ > + int cmd = 0; > + > + if (old->start + to_sector(old->orig_size) <= new->start) > + cmd = 1; > + if (old->start >= new->start + to_sector(new->orig_size)) > + cmd = -1; > + > + dprintk("%s: old: op: %lu, start: %llu, size: %llu, off: %u, " > + "new: op: %lu, start: %llu, size: %llu, off: %u, cmp: %d.\n", > + __func__, bio_rw(old->bio), old->start, old->orig_size, > + old->offset, > + bio_rw(new->bio), new->start, new->orig_size, > + new->offset, cmd); > + > + return cmd; > +} > + > +/* > + * This function enqueues request into tree, indexed by start of the request, > + * and also puts request into ordered queue. > + */ > +int kst_enqueue_req(struct kst_state *st, struct dst_request *req) > +{ > + struct rb_node **n = &st->request_root.rb_node, *parent = NULL; > + struct dst_request *old = NULL; > + int cmp, err = 0; > + > + while (*n) { > + parent = *n; > + old = rb_entry(parent, struct dst_request, request_entry); > + > + cmp = dst_compare_request_id(old, req); > + if (cmp < 0) > + n = &parent->rb_left; > + else if (cmp > 0) > + n = &parent->rb_right; > + else { > + printk("%s: [%c] old_req: %p, start: %llu, " > + "size: %llu.\n", > + __func__, > + (bio_rw(old->bio) == WRITE)?'W':'R', > + old, old->start, old->orig_size); > + err = -EEXIST; > + break; > + } > + } > + > + if (!err) { > + rb_link_node(&req->request_entry, parent, n); > + rb_insert_color(&req->request_entry, &st->request_root); > + } > + > + if (req->size != req->orig_size) > + list_add(&req->request_list_entry, &st->request_list); > + else > + list_add_tail(&req->request_list_entry, &st->request_list); > + return err; > +} > +EXPORT_SYMBOL_GPL(kst_enqueue_req); > + > +/* > + * BIOs for local exporting node are freed via this function. > + */ > +static void kst_export_put_bio(struct bio *bio) > +{ > + int i; > + struct bio_vec *bv; > + > + dprintk("%s: bio: %p, size: %u, idx: %d, num: %d.\n", > + __func__, bio, bio->bi_size, bio->bi_idx, > + bio->bi_vcnt); > + > + bio_for_each_segment(bv, bio, i) > + __free_page(bv->bv_page); > + bio_put(bio); > +} > + > +/* > + * This is a generic request completion function for requests, > + * queued for async processing. > + * If it is local export node, state machine is different, > + * see details below. > + */ > +void kst_complete_req(struct dst_request *req, int err) > +{ > + dprintk("%s: bio: %p, req: %p, size: %llu, orig_size: %llu, " > + "bi_size: %u, err: %d, flags: %u.\n", > + __func__, req->bio, req, req->size, req->orig_size, > + req->bio->bi_size, err, req->flags); > + > + if (req->flags & DST_REQ_EXPORT) { > + if (req->flags & DST_REQ_EXPORT_WRITE) { > + req->bio->bi_rw = WRITE; > + generic_make_request(req->bio); > + } else > + kst_export_put_bio(req->bio); > + } else { > + req->bio_endio(req, err); > + } > + dst_free_request(req); > +} > +EXPORT_SYMBOL_GPL(kst_complete_req); > + > +static void kst_flush_requests(struct kst_state *st) > +{ > + struct dst_request *req; > + > + while ((req = kst_dequeue_req(st)) != NULL) > + kst_complete_req(req, -EIO); > +} > + > +static int kst_poll_init(struct kst_state *st) > +{ > + struct kst_poll_helper ph; > + > + ph.st = st; > + init_poll_funcptr(&ph.pt, &kst_queue_func); > + > + st->socket->ops->poll(NULL, st->socket, &ph.pt); > + return 0; > +} > + > +/* > + * Main state creation function. > + * It creates new state according to given operations > + * and links it into worker structure and node. > + */ > +static struct kst_state *kst_state_init(struct dst_node *node, > + unsigned int permissions, > + struct kst_state_ops *ops, void *data) > +{ > + struct kst_state *st; > + int err; > + > + st = kzalloc(sizeof(struct kst_state), GFP_KERNEL); > + if (!st) > + return ERR_PTR(-ENOMEM); > + > + st->permissions = permissions; > + st->node = node; > + st->ops = ops; > + INIT_LIST_HEAD(&st->ready_entry); > + INIT_LIST_HEAD(&st->entry); > + st->request_root.rb_node = NULL; > + INIT_LIST_HEAD(&st->request_list); > + mutex_init(&st->request_lock); > + > + err = st->ops->init(st, data); > + if (err) > + goto err_out_free; > + mutex_lock(&node->w->state_mutex); > + list_add_tail(&st->entry, &node->w->state_list); > + mutex_unlock(&node->w->state_mutex); > + > + kst_wake(st); > + > + return st; > + > +err_out_free: > + kfree(st); > + return ERR_PTR(err); > +} > + > +/* > + * This function is called when node is removed, > + * or when state is destroyed for connected to local exporting > + * node client. > + */ > +void kst_state_exit(struct kst_state *st) > +{ > + struct kst_worker *w = st->node->w; > + > + dprintk("%s: st: %p.\n", __func__, st); > + > + mutex_lock(&w->state_mutex); > + list_del_init(&st->entry); > + mutex_unlock(&w->state_mutex); > + > + st->ops->exit(st); > + > + st->node->state = NULL; > + > + kfree(st); > +} > + > +static int kst_error(struct kst_state *st, int err) > +{ > + if ((err == -ECONNRESET || err == -EPIPE) && st->ops->recovery(st, err)) > + err = st->ops->recovery(st, err); > + > + return st->node->st->alg->ops->error(st, err); > +} > + > +/* > + * This is main state processing function. > + * It tries to complete request and invoke appropriate > + * callbacks in case of errors or successfull operation finish. > + */ > +static int kst_thread_process_state(struct kst_state *st) > +{ > + int err, empty; > + unsigned int revents; > + struct dst_request *req, *tmp; > + > + mutex_lock(&st->request_lock); > + if (st->ops->ready) { > + err = st->ops->ready(st); > + if (err) { > + mutex_unlock(&st->request_lock); > + if (err < 0) > + kst_state_exit(st); > + return err; > + } > + } > + > + err = 0; > + empty = 1; > + req = NULL; > + list_for_each_entry_safe(req, tmp, &st->request_list, > + request_list_entry) { > + empty = 0; > + revents = st->socket->ops->poll(st->socket->file, > + st->socket, NULL); > + dprintk("\n%s: st: %p, revents: %x.\n", __func__, st, revents); > + if (!revents) > + break; > + err = req->callback(req, revents); > + dprintk("%s: callback returned, st: %p, err: %d.\n", > + __func__, st, err); > + if (err) > + break; > + } > + mutex_unlock(&st->request_lock); > + > + dprintk("%s: req: %p, err: %d.\n", __func__, req, err); > + if (err < 0) { > + err = kst_error(st, err); > + if (err && (st != st->node->state)) { > + dprintk("%s: err: %d, st: %p, node->state: %p.\n", > + __func__, err, st, st->node->state); > + /* > + * Accepted client has state not related to storage > + * node, so it must be freed explicitely. > + */ > + > + kst_state_exit(st); > + return err; > + } > + > + kst_wake(st); > + } > + > + if (list_empty(&st->request_list) && !empty) > + kst_wake(st); > + > + return err; > +} > + > +/* > + * Main worker thread - one per storage. > + */ > +static int kst_thread_func(void *data) > +{ > + struct kst_worker *w = data; > + struct kst_state *st; > + unsigned long flags; > + int err = 0; > + > + while (!kthread_should_stop()) { > + wait_event_interruptible_timeout(w->wait, > + !list_empty(&w->ready_list) || > + kthread_should_stop(), > + HZ); > + > + st = NULL; > + spin_lock_irqsave(&w->ready_lock, flags); > + if (!list_empty(&w->ready_list)) { > + st = list_entry(w->ready_list.next, struct kst_state, > + ready_entry); > + list_del_init(&st->ready_entry); > + } > + spin_unlock_irqrestore(&w->ready_lock, flags); > + > + if (!st) > + continue; > + > + err = kst_thread_process_state(st); > + } > + > + return err; > +} > + > +/* > + * Worker initialization - this object will host andprocess all states, > + * which in turn host requests for remote targets. > + */ > +struct kst_worker *kst_worker_init(int id) > +{ > + struct kst_worker *w; > + int err; > + > + w = kzalloc(sizeof(struct kst_worker), GFP_KERNEL); > + if (!w) > + return ERR_PTR(-ENOMEM); > + > + w->id = id; > + init_waitqueue_head(&w->wait); > + spin_lock_init(&w->ready_lock); > + mutex_init(&w->state_mutex); > + > + INIT_LIST_HEAD(&w->ready_list); > + INIT_LIST_HEAD(&w->state_list); > + > + w->req_pool = mempool_create_slab_pool(256, dst_request_cache); > + if (!w->req_pool) { > + err = -ENOMEM; > + goto err_out_free; > + } > + > + w->thread = kthread_run(&kst_thread_func, w, "kst%d", w->id); > + if (IS_ERR(w->thread)) { > + err = PTR_ERR(w->thread); > + goto err_out_destroy; > + } > + > + mutex_lock(&kst_worker_mutex); > + list_add_tail(&w->entry, &kst_worker_list); > + mutex_unlock(&kst_worker_mutex); > + > + return w; > + > +err_out_destroy: > + mempool_destroy(w->req_pool); > +err_out_free: > + kfree(w); > + return ERR_PTR(err); > +} > + > +void kst_worker_exit(struct kst_worker *w) > +{ > + struct kst_state *st, *n; > + > + mutex_lock(&kst_worker_mutex); > + list_del(&w->entry); > + mutex_unlock(&kst_worker_mutex); > + > + kthread_stop(w->thread); > + > + list_for_each_entry_safe(st, n, &w->state_list, entry) { > + kst_state_exit(st); > + } > + > + mempool_destroy(w->req_pool); > + kfree(w); > +} > + > +/* > + * Common state exit callback. > + * Removes itself from worker's list of states, > + * releases socket and flushes all requests. > + */ > +static void kst_common_exit(struct kst_state *st) > +{ > + unsigned long flags; > + > + dprintk("%s: st: %p.\n", __func__, st); > + kst_poll_exit(st); > + > + spin_lock_irqsave(&st->node->w->ready_lock, flags); > + list_del_init(&st->ready_entry); > + spin_unlock_irqrestore(&st->node->w->ready_lock, flags); > + > + kst_sock_release(st); > + kst_flush_requests(st); > +} > + > +/* > + * Listen socket contains security attributes in request_list, > + * so it can not be flushed via usual way. > + */ > +static void kst_listen_flush(struct kst_state *st) > +{ > + struct dst_secure *s, *tmp; > + > + list_for_each_entry_safe(s, tmp, &st->request_list, sec_entry) { > + list_del(&s->sec_entry); > + kfree(s); > + } > +} > + > +static void kst_listen_exit(struct kst_state *st) > +{ > + kst_listen_flush(st); > + kst_common_exit(st); > +} > + > +/* > + * Header sending function - may block. > + */ > +static int kst_data_send_header(struct kst_state *st, > + struct dst_remote_request *r) > +{ > + struct msghdr msg; > + struct kvec iov; > + > + iov.iov_base = r; > + iov.iov_len = sizeof(struct dst_remote_request); > + > + msg.msg_iov = (struct iovec *)&iov; > + msg.msg_iovlen = 1; > + msg.msg_name = NULL; > + msg.msg_namelen = 0; > + msg.msg_control = NULL; > + msg.msg_controllen = 0; > + msg.msg_flags = MSG_WAITALL | MSG_NOSIGNAL; > + > + return kernel_sendmsg(st->socket, &msg, &iov, 1, iov.iov_len); > +} > + > +/* > + * BIO vector receiving function - does not block, but may sleep because > + * of scheduling policy. > + */ > +static int kst_data_recv_bio_vec(struct kst_state *st, struct bio_vec *bv, > + unsigned int offset, unsigned int size) > +{ > + struct msghdr msg; > + struct kvec iov; > + void *kaddr; > + int err; > + > + kaddr = kmap(bv->bv_page); > + > + iov.iov_base = kaddr + bv->bv_offset + offset; > + iov.iov_len = size; > + > + msg.msg_iov = (struct iovec *)&iov; > + msg.msg_iovlen = 1; > + msg.msg_name = NULL; > + msg.msg_namelen = 0; > + msg.msg_control = NULL; > + msg.msg_controllen = 0; > + msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; > + > + err = kernel_recvmsg(st->socket, &msg, &iov, 1, iov.iov_len, > + msg.msg_flags); > + kunmap(bv->bv_page); > + > + return err; > +} > + > +/* > + * BIO vector sending function - does not block, but may sleep because > + * of scheduling policy. > + */ > +static int kst_data_send_bio_vec(struct kst_state *st, struct bio_vec *bv, > + unsigned int offset, unsigned int size) > +{ > + return kernel_sendpage(st->socket, bv->bv_page, > + bv->bv_offset + offset, size, > + MSG_DONTWAIT | MSG_NOSIGNAL); > +} > + > +typedef int (*kst_data_process_bio_vec_t)(struct kst_state *st, > + struct bio_vec *bv, unsigned int offset, unsigned int size); > + > +/* > + * @req: processing request. > + * Contains BIO and all related to its processing info. > + * > + * This function sends or receives requested number of pages from given BIO. > + * > + * In case of errors negative return value is returned and @size, > + * @index and @off are set to the: > + * - number of bytes not yet processed (i.e. the rest of the bytes to be > + * processed). > + * - index of the last bio_vec started to be processed (header sent). > + * - offset of the first byte to be processed in the bio_vec. > + * > + * If there are no errors, zero is returned. > + * -EAGAIN is not an error and is transformed into zero return value, > + * called must check if @size is zero, in that case whole BIO is processed > + * and thus req->bio_endio() can be called, othervise new request must be allocated > + * to be processed later. > + */ > +static int kst_data_process_bio(struct dst_request *req) > +{ > + int err = -ENOSPC, partial = (req->size != req->orig_size); > + struct dst_remote_request r; > + kst_data_process_bio_vec_t func; > + unsigned int cur_size; > + > + r.flags = cpu_to_be32(((unsigned long)req->bio) & 0xffffffff); > + > + if (bio_rw(req->bio) == WRITE) { > + r.cmd = cpu_to_be32(DST_WRITE); > + func = kst_data_send_bio_vec; > + } else { > + r.cmd = cpu_to_be32(DST_READ); > + func = kst_data_recv_bio_vec; > + } > + > + dprintk("%s: start: [%c], start: %llu, idx: %d, num: %d, " > + "size: %llu, offset: %u.\n", > + __func__, (bio_rw(req->bio) == WRITE)?'W':'R', > + req->start, req->idx, req->num, req->size, req->offset); > + > + while (req->idx < req->num) { > + struct bio_vec *bv = bio_iovec_idx(req->bio, req->idx); > + > + cur_size = min_t(u64, bv->bv_len - req->offset, req->size); > + > + if (cur_size == 0) { > + printk("%s: %d/%d: start: %llu, " > + "bv_offset: %u, bv_len: %u, " > + "req_offset: %u, req_size: %llu, " > + "req: %p, bio: %p, err: %d.\n", > + __func__, req->idx, req->num, req->start, > + bv->bv_offset, bv->bv_len, > + req->offset, req->size, > + req, req->bio, err); > + BUG(); > + } > + > + if (!(req->flags & DST_REQ_HEADER_SENT)) { > + r.sector = cpu_to_be64(req->start); > + r.offset = cpu_to_be32(bv->bv_offset + req->offset); > + r.size = cpu_to_be32(cur_size); > + > + err = kst_data_send_header(req->state, &r); > + if (err != sizeof(struct dst_remote_request)) { > + dprintk("%s: %d/%d: header: start: %llu, " > + "bv_offset: %u, bv_len: %u, " > + "a offset: %u, offset: %u, " > + "cur_size: %u, err: %d.\n", > + __func__, req->idx, req->num, > + req->start, bv->bv_offset, bv->bv_len, > + bv->bv_offset + req->offset, > + req->offset, cur_size, err); > + if (err >= 0) > + err = -EINVAL; > + break; > + } > + > + req->flags |= DST_REQ_HEADER_SENT; > + } > + > + err = func(req->state, bv, req->offset, cur_size); > + if (err <= 0) > + break; > + > + req->offset += err; > + req->size -= err; > + > + if (req->offset != bv->bv_len) { > + dprintk("%s: %d/%d: this: start: %llu, bv_offset: %u, " > + "bv_len: %u, a offset: %u, offset: %u, " > + "cur_size: %u, err: %d.\n", > + __func__, req->idx, req->num, req->start, > + bv->bv_offset, bv->bv_len, > + bv->bv_offset + req->offset, > + req->offset, cur_size, err); > + err = -EAGAIN; > + break; > + } > + req->offset = 0; > + req->idx++; > + req->flags &= ~DST_REQ_HEADER_SENT; > + > + req->start += to_sector(bv->bv_len); > + } > + > + if (err <= 0 && err != -EAGAIN) { > + if (err == 0) > + err = -ECONNRESET; > + } else > + err = 0; > + > + if (req->size) { > + req->state->flags |= KST_FLAG_PARTIAL; > + } else if (partial) { > + req->state->flags &= ~KST_FLAG_PARTIAL; > + } > + > + if (err < 0 || (req->idx == req->num && req->size)) { > + dprintk("%s: return: idx: %d, num: %d, offset: %u, " > + "size: %llu, err: %d.\n", > + __func__, req->idx, req->num, req->offset, > + req->size, err); > + } > + dprintk("%s: end: start: %llu, idx: %d, num: %d, " > + "size: %llu, offset: %u.\n", > + __func__, req->start, req->idx, req->num, > + req->size, req->offset); > + > + return err; > +} > + > +void kst_bio_endio(struct dst_request *req, int err) > +{ > + if (err) > + printk("%s: freeing bio: %p, bi_size: %u, " > + "orig_size: %llu, req: %p.\n", > + __func__, req->bio, req->bio->bi_size, req->orig_size, req); > + bio_endio(req->bio, req->orig_size, err); > +} > +EXPORT_SYMBOL_GPL(kst_bio_endio); > + > +/* > + * This callback is invoked by worker thread to process given request. > + */ > +int kst_data_callback(struct dst_request *req, unsigned int revents) > +{ > + int err; > + > + dprintk("%s: req: %p, num: %d, idx: %d, bio: %p, " > + "revents: %x, flags: %x.\n", > + __func__, req, req->num, req->idx, req->bio, > + revents, req->flags); > + > + if (req->flags & DST_REQ_EXPORT_READ) > + return 1; > + > + err = kst_data_process_bio(req); > + if (err < 0) > + goto err_out; > + > + if (!req->size) { > + dprintk("%s: complete: req: %p, bio: %p.\n", > + __func__, req, req->bio); > + kst_del_req(req); > + kst_complete_req(req, 0); > + return 0; > + } > + > + if (revents & (POLLERR | POLLHUP | POLLRDHUP)) { > + err = -EPIPE; > + goto err_out; > + } > + > + return 1; > + > +err_out: > + return err; > +} > +EXPORT_SYMBOL_GPL(kst_data_callback); > + > +#define KST_CONG_COMPLETED (0) > +#define KST_CONG_NOT_FOUND (1) > +#define KST_CONG_QUEUE (-1) > + > +/* > + * kst_congestion - checks for data congestion, i.e. the case, when given > + * block request crosses an area of the another block request which > + * is not yet sent to the remote node. > + * > + * @req: dst request containing block io related information. > + * > + * Return value: > + * %KST_CONG_COMPLETED - congestion was found and processed, > + * bio must be ended, request is completed. > + * %KST_CONG_NOT_FOUND - no congestion found, > + * request must be processed as usual > + * %KST_CONG_QUEUE - congestion has been found, but bio is not completed, > + * new request must be allocated and processed. > + */ > +static int kst_congestion(struct dst_request *req) > +{ > + int cmp, i; > + struct kst_state *st = req->state; > + struct rb_node *n = st->request_root.rb_node; > + struct dst_request *old = NULL, *dst_req, *src_req; > + > + while (n) { > + src_req = rb_entry(n, struct dst_request, request_entry); > + cmp = dst_compare_request_id(src_req, req); > + > + if (cmp < 0) > + n = n->rb_left; > + else if (cmp > 0) > + n = n->rb_right; > + else { > + old = src_req; > + break; > + } > + } > + > + if (likely(!old)) > + return KST_CONG_NOT_FOUND; > + > + dprintk("%s: old: op: %lu, start: %llu, size: %llu, off: %u, " > + "new: op: %lu, start: %llu, size: %llu, off: %u.\n", > + __func__, bio_rw(old->bio), old->start, old->orig_size, > + old->offset, > + bio_rw(req->bio), req->start, req->orig_size, req->offset); > + > + if ((bio_rw(old->bio) != WRITE) && (bio_rw(req->bio) != WRITE)) { > + return KST_CONG_QUEUE; > + } > + > + if (unlikely(req->offset != old->offset)) > + return KST_CONG_QUEUE; > + > + src_req = old; > + dst_req = req; > + if (bio_rw(req->bio) == WRITE) { > + dst_req = old; > + src_req = req; > + } > + > + /* Actually we could partially complete new request by copying > + * part of the first one, but not now, consider this as a > + * (low-priority) todo item. > + */ > + if (src_req->start + src_req->orig_size < > + dst_req->start + dst_req->orig_size) > + return KST_CONG_QUEUE; > + > + /* > + * So, only process if new request is differnt from old one, > + * or subsequent write, i.e.: > + * - not completed write and request to read > + * - not completed read and request to write > + * - not completed write and request to (over)write > + */ > + for (i = old->idx; i < old->num; ++i) { > + struct bio_vec *bv_src, *bv_dst; > + void *src, *dst; > + u64 len; > + > + bv_src = bio_iovec_idx(src_req->bio, i); > + bv_dst = bio_iovec_idx(dst_req->bio, i); > + > + if (unlikely(bv_dst->bv_offset != bv_src->bv_offset)) > + return KST_CONG_QUEUE; > + > + if (unlikely(bv_dst->bv_len != bv_src->bv_len)) > + return KST_CONG_QUEUE; > + > + src = kmap_atomic(bv_src->bv_page, KM_USER0); > + dst = kmap_atomic(bv_dst->bv_page, KM_USER1); > + > + len = min_t(u64, bv_dst->bv_len, dst_req->size); > + > + memcpy(dst + bv_dst->bv_offset, src + bv_src->bv_offset, len); > + > + kunmap_atomic(src, KM_USER0); > + kunmap_atomic(dst, KM_USER1); > + > + dst_req->idx++; > + dst_req->size -= len; > + dst_req->offset = 0; > + dst_req->start += to_sector(len); > + > + if (!dst_req->size) > + break; > + } > + > + if (req == dst_req) > + return KST_CONG_COMPLETED; > + > + kst_del_req(dst_req); > + kst_complete_req(dst_req, 0); > + > + return KST_CONG_NOT_FOUND; > +} > + > +struct dst_request *dst_clone_request(struct dst_request *req, mempool_t *pool) > +{ > + struct dst_request *new_req; > + > + new_req = mempool_alloc(pool, GFP_NOIO); > + if (!new_req) > + return NULL; > + > + memset(new_req, 0, sizeof(struct dst_request)); > + > + dprintk("%s: req: %p, new_req: %p, bio: %p.\n", > + __func__, req, new_req, req->bio); > + > + RB_CLEAR_NODE(&new_req->request_entry); > + > + if (req) { > + new_req->bio = req->bio; > + new_req->state = req->state; > + new_req->node = req->node; > + new_req->idx = req->idx; > + new_req->num = req->num; > + new_req->size = req->size; > + new_req->orig_size = req->orig_size; > + new_req->offset = req->offset; > + new_req->start = req->start; > + new_req->flags = req->flags; > + new_req->bio_endio = req->bio_endio; > + new_req->priv = req->priv; > + } > + > + return new_req; > +} > +EXPORT_SYMBOL_GPL(dst_clone_request); > + > +void dst_free_request(struct dst_request *req) > +{ > + dprintk("%s: free req: %p, pool: %p, bio: %p, state: %p, node: %p.\n", > + __func__, req, req->node->w->req_pool, > + req->bio, req->state, req->node); > + mempool_free(req, req->node->w->req_pool); > +} > +EXPORT_SYMBOL_GPL(dst_free_request); > + > +/* > + * This is main data processing function, eventually invoked from block layer. > + * It tries to complte request, but if it is about to block, it allocates > + * new request and queues it to main worker to be processed when events allow. > + */ > +static int kst_data_push(struct dst_request *req) > +{ > + struct kst_state *st = req->state; > + struct dst_request *new_req; > + unsigned int revents; > + int err, locked = 0; > + > + dprintk("%s: start: %llu, size: %llu, bio: %p.\n", > + __func__, req->start, req->size, req->bio); > + > + if (mutex_trylock(&st->request_lock)) { > + locked = 1; > + > + if (st->flags & (KST_FLAG_PARTIAL | DST_REQ_ALWAYS_QUEUE)) > + goto alloc_new_req; > + > + err = kst_congestion(req); > + if (err == KST_CONG_COMPLETED) { > + err = 0; > + goto out_bio_endio; > + } > + > + if (err == KST_CONG_NOT_FOUND) { > + revents = st->socket->ops->poll(NULL, st->socket, NULL); > + dprintk("%s: st: %p, bio: %p, revents: %x.\n", > + __func__, st, req->bio, revents); > + if (revents & POLLOUT) { > + err = kst_data_process_bio(req); > + if (err < 0) > + goto out_unlock; > + > + if (!req->size) { > + err = 0; > + goto out_bio_endio; > + } > + } > + } > + } > + > +alloc_new_req: > + err = -ENOMEM; > + new_req = dst_clone_request(req, req->node->w->req_pool); > + if (!new_req) > + goto out_unlock; > + > + new_req->callback = &kst_data_callback; > + > + if (!locked) > + mutex_lock(&st->request_lock); > + locked = 1; > + > + err = kst_enqueue_req(st, new_req); > + mutex_unlock(&st->request_lock); > + locked = 0; > + if (err) { > + printk(KERN_NOTICE "%s: congestion [%c], start: %llu, idx: %d," > + " num: %d, size: %llu, offset: %u, err: %d.\n", > + __func__, (bio_rw(req->bio) == WRITE)?'W':'R', > + req->start, req->idx, req->num, req->size, > + req->offset, err); > + } > + > + kst_wake(st); > + > + return 0; > + > +out_bio_endio: > + req->bio_endio(req, err); > +out_unlock: > + if (locked) > + mutex_unlock(&st->request_lock); > + locked = 0; > + > + if (err) { > + err = kst_error(st, err); > + if (!err) > + goto alloc_new_req; > + } > + > + if (err) { > + printk("%s: error [%c], start: %llu, idx: %d, num: %d, " > + "size: %llu, offset: %u, err: %d.\n", > + __func__, (bio_rw(req->bio) == WRITE)?'W':'R', > + req->start, req->idx, req->num, req->size, > + req->offset, err); > + req->bio_endio(req, err); > + } > + > + kst_wake(st); > + return err; > +} > + > +/* > + * Remote node initialization callback. > + */ > +static int kst_data_init(struct kst_state *st, void *data) > +{ > + int err; > + > + st->socket = data; > + st->socket->sk->sk_allocation = GFP_NOIO; > + /* > + * Why not? > + */ > + st->socket->sk->sk_sndbuf = st->socket->sk->sk_sndbuf = 1024*1024*10; > + > + err = kst_poll_init(st); > + if (err) > + return err; > + > + return 0; > +} > + > +/* > + * Remote node recovery function - tries to reconnect to given target. > + */ > +static int kst_data_recovery(struct kst_state *st, int err) > +{ > + struct socket *sock; > + struct sockaddr addr; > + int addrlen; > + struct dst_request *req; > + > + if (err != -ECONNRESET && err != -EPIPE) { > + dprintk("%s: state %p does not know how " > + "to recover from error %d.\n", > + __func__, st, err); > + return err; > + } > + > + err = sock_create(st->socket->ops->family, st->socket->type, > + st->socket->sk->sk_protocol, &sock); > + if (err < 0) > + goto err_out_exit; > + > + sock->sk->sk_sndtimeo = sock->sk->sk_rcvtimeo = > + msecs_to_jiffies(DST_DEFAULT_TIMEO); > + > + err = sock->ops->getname(st->socket, &addr, &addrlen, 2); > + if (err) > + goto err_out_destroy; > + > + err = sock->ops->connect(sock, &addr, addrlen, 0); > + if (err) > + goto err_out_destroy; > + > + kst_poll_exit(st); > + kst_sock_release(st); > + > + mutex_lock(&st->request_lock); > + err = st->ops->init(st, sock); > + if (!err) { > + /* > + * After reconnection is completed all requests > + * must be resent from the state they were finished previously, > + * but with new headers. > + */ > + list_for_each_entry(req, &st->request_list, request_list_entry) > + req->flags &= ~DST_REQ_HEADER_SENT; > + } > + mutex_unlock(&st->request_lock); > + if (err < 0) > + goto err_out_destroy; > + > + kst_wake(st); > + dprintk("%s: recovery completed.\n", __func__); > + > + return 0; > + > +err_out_destroy: > + sock_release(sock); > +err_out_exit: > + dprintk("%s: revovery failed: st: %p, err: %d.\n", __func__, st, err); > + return err; > +} > + > +static inline void kst_convert_header(struct dst_remote_request *r) > +{ > + r->cmd = be32_to_cpu(r->cmd); > + r->sector = be64_to_cpu(r->sector); > + r->offset = be32_to_cpu(r->offset); > + r->size = be32_to_cpu(r->size); > + r->flags = be32_to_cpu(r->flags); > +} > + > +/* > + * Local exporting node end IO callbacks. > + */ > +static int kst_export_write_end_io(struct bio *bio, unsigned int size, int err) > +{ > + dprintk("%s: bio: %p, size: %u, idx: %d, num: %d, err: %d.\n", > + __func__, bio, bio->bi_size, bio->bi_idx, bio->bi_vcnt, err); > + > + if (bio->bi_size) > + return 1; > + > + kst_export_put_bio(bio); > + return 0; > +} > + > +static int kst_export_read_end_io(struct bio *bio, unsigned int size, int err) > +{ > + struct dst_request *req = bio->bi_private; > + struct kst_state *st = req->state; > + > + dprintk("%s: bio: %p, req: %p, size: %u, idx: %d, num: %d, err: %d.\n", > + __func__, bio, req, bio->bi_size, bio->bi_idx, > + bio->bi_vcnt, err); > + > + if (bio->bi_size) > + return 1; > + > + bio->bi_size = req->size = req->orig_size; > + bio->bi_rw = WRITE; > + req->flags &= ~DST_REQ_EXPORT_READ; > + kst_wake(st); > + return 0; > +} > + > +/* > + * This callback is invoked each time new request from remote > + * node to given local export node is received. > + * It allocates new block IO request and queues it for processing. > + */ > +static int kst_export_ready(struct kst_state *st) > +{ > + struct dst_remote_request r; > + struct msghdr msg; > + struct kvec iov; > + struct bio *bio; > + int err, nr, i; > + struct dst_request *req; > + sector_t data_size; > + unsigned int revents = st->socket->ops->poll(NULL, st->socket, NULL); > + > + if (revents & (POLLERR | POLLHUP)) { > + err = -EPIPE; > + goto err_out_exit; > + } > + > + if (!(revents & POLLIN) || !list_empty(&st->request_list)) > + return 0; > + > + iov.iov_base = &r; > + iov.iov_len = sizeof(struct dst_remote_request); > + > + msg.msg_iov = (struct iovec *)&iov; > + msg.msg_iovlen = 1; > + msg.msg_name = NULL; > + msg.msg_namelen = 0; > + msg.msg_control = NULL; > + msg.msg_controllen = 0; > + msg.msg_flags = MSG_WAITALL | MSG_NOSIGNAL; > + > + err = kernel_recvmsg(st->socket, &msg, &iov, 1, > + iov.iov_len, msg.msg_flags); > + if (err != sizeof(struct dst_remote_request)) { > + err = -EINVAL; > + goto err_out_exit; > + } > + > + kst_convert_header(&r); > + > + dprintk("\n%s: cmd: %u, sector: %llu, size: %u, " > + "flags: %x, offset: %u.\n", > + __func__, r.cmd, r.sector, r.size, r.flags, r.offset); > + > + err = -EINVAL; > + if (r.cmd != DST_READ && r.cmd != DST_WRITE && r.cmd != DST_REMOTE_CFG) > + goto err_out_exit; > + > + data_size = get_capacity(st->node->bdev->bd_disk); > + if ((signed)(r.sector + to_sector(r.size)) < 0 || > + (signed)(r.sector + to_sector(r.size)) > data_size || > + (signed)r.sector > data_size) > + goto err_out_exit; > + > + if (r.cmd == DST_REMOTE_CFG) { > + r.sector = data_size; > + kst_convert_header(&r); > + > + iov.iov_base = &r; > + iov.iov_len = sizeof(struct dst_remote_request); > + > + msg.msg_iov = (struct iovec *)&iov; > + msg.msg_iovlen = 1; > + msg.msg_name = NULL; > + msg.msg_namelen = 0; > + msg.msg_control = NULL; > + msg.msg_controllen = 0; > + msg.msg_flags = MSG_WAITALL | MSG_NOSIGNAL; > + > + err = kernel_sendmsg(st->socket, &msg, &iov, 1, iov.iov_len); > + if (err != sizeof(struct dst_remote_request)) { > + err = -EINVAL; > + goto err_out_exit; > + } > + kst_wake(st); > + return 0; > + } > + > + nr = r.size/PAGE_SIZE + 1; > + > + while (r.size) { > + int nr_pages = min(BIO_MAX_PAGES, nr); > + unsigned int size; > + struct page *page; > + > + err = -ENOMEM; > + req = dst_clone_request(NULL, st->node->w->req_pool); > + if (!req) > + goto err_out_exit; > + > + dprintk("%s: alloc req: %p, pool: %p.\n", > + __func__, req, st->node->w->req_pool); > + > + bio = bio_alloc(GFP_NOIO, nr_pages); > + if (!bio) > + goto err_out_free_req; > + > + req->flags = DST_REQ_EXPORT | DST_REQ_HEADER_SENT; > + req->bio = bio; > + req->state = st; > + req->node = st->node; > + req->callback = &kst_data_callback; > + req->bio_endio = &kst_bio_endio; > + > + /* > + * Yes, looks a bit weird. > + * Logic is simple - for local exporting node all operations > + * are reversed compared to usual nodes, since usual nodes > + * process remote data and local export node process remote > + * requests, so that writing data means sending data to > + * remote node and receiving on the local export one. > + * > + * So, to process writing to the exported node we need first > + * to receive data from the net (i.e. to perform READ > + * operationin terms of usual node), and then put it to the > + * storage (WRITE command, so it will be changed before > + * calling generic_make_request()). > + * > + * To process read request from the exported node we need > + * first to read it from storage (READ command for BIO) > + * and then send it over the net (perform WRITE operation > + * in terms of network). > + */ > + if (r.cmd == DST_WRITE) { > + req->flags |= DST_REQ_EXPORT_WRITE; > + bio->bi_end_io = kst_export_write_end_io; > + } else { > + req->flags |= DST_REQ_EXPORT_READ; > + bio->bi_end_io = kst_export_read_end_io; > + } > + bio->bi_rw = READ; > + bio->bi_private = req; > + bio->bi_sector = r.sector; > + bio->bi_bdev = st->node->bdev; > + > + for (i = 0; i < nr_pages; ++i) { > + page = alloc_page(GFP_NOIO); > + if (!page) > + break; > + > + size = min_t(u32, PAGE_SIZE, r.size); > + > + err = bio_add_page(bio, page, size, r.offset); > + dprintk("%s: %d/%d: page: %p, size: %u, offset: %u, " > + "err: %d.\n", > + __func__, i, nr_pages, page, size, > + r.offset, err); > + if (err <= 0) > + break; > + > + if (err == size) { > + r.offset = 0; > + nr--; > + } else { > + r.offset += err; > + } > + > + r.size -= err; > + r.sector += to_sector(err); > + > + if (!r.size) > + break; > + } > + > + if (!bio->bi_vcnt) { > + err = -ENOMEM; > + goto err_out_put; > + } > + > + req->size = req->orig_size = bio->bi_size; > + req->start = bio->bi_sector; > + req->idx = 0; > + req->num = bio->bi_vcnt; > + > + dprintk("%s: submitting: bio: %p, req: %p, start: %llu, " > + "size: %llu, idx: %d, num: %d, offset: %u, err: %d.\n", > + __func__, bio, req, req->start, req->size, > + req->idx, req->num, req->offset, err); > + > + err = kst_enqueue_req(st, req); > + if (err) > + goto err_out_put; > + > + if (r.cmd == DST_READ) { > + generic_make_request(bio); > + } > + } > + > + kst_wake(st); > + return 0; > + > +err_out_put: > + bio_put(bio); > +err_out_free_req: > + dst_free_request(req); > +err_out_exit: > + dprintk("%s: error: %d.\n", __func__, err); > + return err; > +} > + > +static void kst_export_exit(struct kst_state *st) > +{ > + struct dst_node *n = st->node; > + > + dprintk("%s: st: %p.\n", __func__, st); > + > + kst_common_exit(st); > + dst_node_put(n); > +} > + > +static struct kst_state_ops kst_data_export_ops = { > + .init = &kst_data_init, > + .push = &kst_data_push, > + .exit = &kst_export_exit, > + .ready = &kst_export_ready, > +}; > + > +/* > + * This callback is invoked each time listening socket for > + * given local export node becomes ready. > + * It creates new state for connected client and queues for processing. > + */ > +static int kst_listen_ready(struct kst_state *st) > +{ > + struct socket *newsock; > + struct saddr addr; > + struct kst_state *newst; > + int err; > + unsigned int revents, permissions = 0; > + struct dst_secure *s; > + > + revents = st->socket->ops->poll(NULL, st->socket, NULL); > + if (!(revents & POLLIN)) > + return 1; > + > + err = sock_create(st->socket->ops->family, st->socket->type, > + st->socket->sk->sk_protocol, &newsock); > + if (err) > + goto err_out_exit; > + > + err = st->socket->ops->accept(st->socket, newsock, 0); > + if (err) > + goto err_out_put; > + > + if (newsock->ops->getname(newsock, (struct sockaddr *)&addr, > + (int *)&addr.sa_data_len, 2) < 0) { > + err = -ECONNABORTED; > + goto err_out_put; > + } > + > + list_for_each_entry(s, &st->request_list, sec_entry) { > + void *sec_addr, *new_addr; > + > + sec_addr = ((void *)&s->sec.addr) + s->sec.check_offset; > + new_addr = ((void *)&addr) + s->sec.check_offset; > + > + if (!memcmp(sec_addr, new_addr, > + addr.sa_data_len - s->sec.check_offset)) { > + permissions = s->sec.permissions; > + break; > + } > + } > + > + /* > + * So far only reading and writing are supported. > + * Block device does not know about anything else, > + * but as far as I recall, there was a prognosis, > + * that computer will never require more than 640kb of RAM. > + */ > + if (permissions == 0) { > + err = -EPERM; > + goto err_out_put; > + } > + > + if (st->socket->ops->family == AF_INET) { > + struct sockaddr_in *sin = (struct sockaddr_in *)&addr; > + printk(KERN_INFO "%s: Client: %u.%u.%u.%u:%d.\n", __func__, > + NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port)); > + } else if (st->socket->ops->family == AF_INET6) { > + struct sockaddr_in6 *sin = (struct sockaddr_in6 *)&addr; > + printk(KERN_INFO "%s: Client: " > + "%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d", > + __func__, > + NIP6(sin->sin6_addr), ntohs(sin->sin6_port)); > + } > + > + dst_node_get(st->node); > + newst = kst_state_init(st->node, permissions, > + &kst_data_export_ops, newsock); > + if (IS_ERR(newst)) { > + err = PTR_ERR(newst); > + goto err_out_put; > + } > + > + /* > + * Negative return value means error, positive - stop this state > + * processing. Zero allows to check state for pending requests. > + * Listening socket contains security objects in request list, > + * since it does not have any requests. > + */ > + return 1; > + > +err_out_put: > + sock_release(newsock); > +err_out_exit: > + return 1; > +} > + > +static int kst_listen_init(struct kst_state *st, void *data) > +{ > + int err = -ENOMEM, i; > + struct dst_le_template *tmp = data; > + struct dst_secure *s; > + > + for (i=0; ile.secure_attr_num; ++i) { > + s = kmalloc(sizeof(struct dst_secure), GFP_KERNEL); > + if (!s) > + goto err_out_exit; > + > + if (copy_from_user(&s->sec, tmp->data, > + sizeof(struct dst_secure_user))) { > + kfree(s); > + err = -EFAULT; > + goto err_out_exit; > + } > + > + list_add_tail(&s->sec_entry, &st->request_list); > + tmp->data += sizeof(struct dst_secure_user); > + > + if (s->sec.addr.sa_family == AF_INET) { > + struct sockaddr_in *sin = > + (struct sockaddr_in *)&s->sec.addr; > + printk(KERN_INFO "%s: Client: %u.%u.%u.%u:%d, " > + "permissions: %x.\n", > + __func__, NIPQUAD(sin->sin_addr.s_addr), > + ntohs(sin->sin_port), s->sec.permissions); > + } else if (s->sec.addr.sa_family == AF_INET6) { > + struct sockaddr_in6 *sin = > + (struct sockaddr_in6 *)&s->sec.addr; > + printk(KERN_INFO "%s: Client: " > + "%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d, " > + "permissions: %x.\n", > + __func__, NIP6(sin->sin6_addr), > + ntohs(sin->sin6_port), s->sec.permissions); > + } > + } > + > + err = kst_sock_create(st, &tmp->le.rctl.addr, tmp->le.rctl.type, > + tmp->le.rctl.proto, tmp->le.backlog); > + if (err) > + goto err_out_exit; > + > + err = kst_poll_init(st); > + if (err) > + goto err_out_release; > + > + return 0; > + > +err_out_release: > + kst_sock_release(st); > +err_out_exit: > + kst_listen_flush(st); > + return err; > +} > + > +/* > + * Operations for different types of states. > + * There are three: > + * data state - created for remote node, when distributed storage connects > + * to remote node, which contain data. > + * listen state - created for local export node, when remote distributed > + * storage's node connects to given node to get/put data. > + * data export state - created for each client connected to above listen > + * state. > + */ > +static struct kst_state_ops kst_listen_ops = { > + .init = &kst_listen_init, > + .exit = &kst_listen_exit, > + .ready = &kst_listen_ready, > +}; > +static struct kst_state_ops kst_data_ops = { > + .init = &kst_data_init, > + .push = &kst_data_push, > + .exit = &kst_common_exit, > + .recovery = &kst_data_recovery, > +}; > + > +struct kst_state *kst_listener_state_init(struct dst_node *node, > + struct dst_le_template *tmp) > +{ > + return kst_state_init(node, DST_PERM_READ | DST_PERM_WRITE, > + &kst_listen_ops, tmp); > +} > + > +struct kst_state *kst_data_state_init(struct dst_node *node, > + struct socket *newsock) > +{ > + return kst_state_init(node, DST_PERM_READ | DST_PERM_WRITE, > + &kst_data_ops, newsock); > +} > + > +/* > + * Remove all workers and associated states. > + */ > +void kst_exit_all(void) > +{ > + struct kst_worker *w, *n; > + > + list_for_each_entry_safe(w, n, &kst_worker_list, entry) { > + kst_worker_exit(w); > + } > +} > diff --git a/include/linux/dst.h b/include/linux/dst.h > new file mode 100644 > index 0000000..7b0feb1 > --- /dev/null > +++ b/include/linux/dst.h > @@ -0,0 +1,354 @@ > +/* > + * 2007+ Copyright (c) Evgeniy Polyakov > + * All rights reserved. > + * > + * This program is free software; you can redistribute it and/or modify > + * it under the terms of the GNU General Public License as published by > + * the Free Software Foundation; either version 2 of the License, or > + * (at your option) any later version. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU General Public License for more details. > + */ > + > +#ifndef __DST_H > +#define __DST_H > + > +#include > + > +#define DST_NAMELEN 32 > +#define DST_NAME "dst" > +#define DST_IOCTL 0xba > + > +enum { > + DST_DEL_NODE = 0, /* Remove node with given id from storage */ > + DST_ADD_REMOTE, /* Add remote node with given id to the storage */ > + DST_ADD_LOCAL, /* Add local node with given id to the storage */ > + DST_ADD_LOCAL_EXPORT, /* Add local node with given id to the storage to be exported and used by remote peers */ > + DST_START_STORAGE, /* Array is ready and storage can be started, if there will be new nodes > + * added to the storage, they will be checked against existing size and > + * probably be dropped (for example in mirror format when new node has smaller > + * size than array created) or inserted. > + */ > + DST_STOP_STORAGE, /* Remove array and all nodes. */ > + DST_CMD_MAX > +}; > + > +#define DST_CTL_FLAGS_REMOTE (1<<0) > +#define DST_CTL_FLAGS_EXPORT (1<<1) > + > +struct dst_ctl > +{ > + char st[DST_NAMELEN]; > + char alg[DST_NAMELEN]; > + __u32 flags; > + __u64 start, size; > +}; > + > +struct dst_local_ctl > +{ > + char name[DST_NAMELEN]; > +}; > + > +#define SADDR_MAX_DATA 128 > + > +struct saddr { > + unsigned short sa_family; /* address family, AF_xxx */ > + char sa_data[SADDR_MAX_DATA]; /* 14 bytes of protocol address */ > + unsigned short sa_data_len; /* Number of bytes used in sa_data */ > +}; > + > +struct dst_remote_ctl > +{ > + __u16 type; > + __u16 proto; > + struct saddr addr; > +}; > + > +#define DST_PERM_READ (1<<0) > +#define DST_PERM_WRITE (1<<1) > + > +/* > + * Right now it is simple model, where each remote address > + * is assigned to set of permissions it is allowed to perform. > + * In real world block device does not know anything but > + * reading and writing, so it should be more than enough. > + */ > +struct dst_secure_user > +{ > + unsigned int permissions; > + unsigned short check_offset; > + struct saddr addr; > +}; > + > +struct dst_local_export_ctl > +{ > + __u32 backlog; > + int secure_attr_num; > + struct dst_local_ctl lctl; > + struct dst_remote_ctl rctl; > +}; > + > +enum { > + DST_REMOTE_CFG = 1, /* Request remote configuration */ > + DST_WRITE, /* Writing */ > + DST_READ, /* Reading */ > + DST_NCMD_MAX, > +}; > + > +struct dst_remote_request > +{ > + __u32 cmd; > + __u32 flags; > + __u64 sector; > + __u32 offset; > + __u32 size; > +}; > + > +#ifdef __KERNEL__ > + > +#include > +#include > +#include > +#include > +#include > +#include > + > +//#define DST_DEBUG > + > +#ifdef DST_DEBUG > +#define dprintk(f, a...) printk(KERN_NOTICE f, ##a) > +#else > +#define dprintk(f, a...) do {} while (0) > +#endif > + > +struct kst_worker > +{ > + struct list_head entry; > + > + struct list_head state_list; > + struct mutex state_mutex; > + > + struct list_head ready_list; > + spinlock_t ready_lock; > + > + mempool_t *req_pool; > + > + struct task_struct *thread; > + > + wait_queue_head_t wait; > + > + int id; > +}; > + > +struct kst_state; > +struct dst_node; > + > +#define DST_REQ_HEADER_SENT (1<<0) > +#define DST_REQ_EXPORT (1<<1) > +#define DST_REQ_EXPORT_WRITE (1<<2) > +#define DST_REQ_EXPORT_READ (1<<3) > +#define DST_REQ_ALWAYS_QUEUE (1<<4) > + > +struct dst_request > +{ > + struct rb_node request_entry; > + struct list_head request_list_entry; > + struct bio *bio; > + struct kst_state *state; > + struct dst_node *node; > + > + u32 flags; > + > + int (*callback)(struct dst_request *dst, > + unsigned int revents); > + void (*bio_endio)(struct dst_request *dst, > + int err); > + > + void *priv; > + atomic_t refcnt; > + > + u64 size, orig_size, start; > + int idx, num; > + u32 offset; > +}; > + > +struct kst_state_ops > +{ > + int (*init)(struct kst_state *, void *); > + int (*push)(struct dst_request *req); > + int (*ready)(struct kst_state *); > + int (*recovery)(struct kst_state *, int err); > + void (*exit)(struct kst_state *); > +}; > + > +#define KST_FLAG_PARTIAL (1<<0) > + > +struct kst_state > +{ > + struct list_head entry; > + struct list_head ready_entry; > + > + wait_queue_t wait; > + wait_queue_head_t *whead; > + > + struct dst_node *node; > + struct socket *socket; > + > + u32 flags, permissions; > + > + struct rb_root request_root; > + struct mutex request_lock; > + struct list_head request_list; > + > + struct kst_state_ops *ops; > +}; > + > +#define DST_DEFAULT_TIMEO 2000 > + > +struct dst_storage; > + > +struct dst_alg_ops > +{ > + int (*add_node)(struct dst_node *n); > + void (*del_node)(struct dst_node *n); > + int (*remap)(struct dst_request *req); > + int (*error)(struct kst_state *state, int err); > + struct module *owner; > +}; > + > +struct dst_alg > +{ > + struct list_head entry; > + char name[DST_NAMELEN]; > + atomic_t refcnt; > + struct dst_alg_ops *ops; > +}; > + > +#define DST_ST_STARTED (1<<0) > + > +struct dst_storage > +{ > + struct list_head entry; > + char name[DST_NAMELEN]; > + struct dst_alg *alg; > + atomic_t refcnt; > + struct mutex tree_lock; > + struct rb_root tree_root; > + > + request_queue_t *queue; > + struct gendisk *disk; > + > + long flags; > + u64 disk_size; > + > + struct device device; > +}; > + > +#define DST_NODE_FROZEN 0 > +#define DST_NODE_NOTSYNC 1 > + > +struct dst_node > +{ > + struct rb_node tree_node; > + > + struct list_head shared; > + struct dst_node *shared_head; > + > + struct block_device *bdev; > + struct dst_storage *st; > + struct kst_state *state; > + struct kst_worker *w; > + > + atomic_t refcnt; > + atomic_t shared_num; > + > + void (*cleanup)(struct dst_node *); > + > + long flags; > + > + u64 start, size; > + > + void (*priv_callback)(struct dst_node *); > + void *priv; > + > + struct device device; > +}; > + > +struct dst_le_template > +{ > + struct dst_local_export_ctl le; > + void __user *data; > +}; > + > +struct dst_secure > +{ > + struct list_head sec_entry; > + struct dst_secure_user sec; > +}; > + > +void kst_state_exit(struct kst_state *st); > + > +struct kst_worker *kst_worker_init(int id); > +void kst_worker_exit(struct kst_worker *w); > + > +struct kst_state *kst_listener_state_init(struct dst_node *node, > + struct dst_le_template *tmp); > +struct kst_state *kst_data_state_init(struct dst_node *node, > + struct socket *newsock); > + > +void kst_wake(struct kst_state *st); > + > +void kst_exit_all(void); > + > +struct dst_alg *dst_alloc_alg(char *name, struct dst_alg_ops *ops); > +void dst_remove_alg(struct dst_alg *alg); > + > +struct dst_node *dst_storage_tree_search(struct dst_storage *st, u64 start); > + > +void dst_node_put(struct dst_node *n); > + > +static inline struct dst_node *dst_node_get(struct dst_node *n) > +{ > + atomic_inc(&n->refcnt); > + return n; > +} > + > +struct dst_request *dst_clone_request(struct dst_request *req, mempool_t *pool); > +void dst_free_request(struct dst_request *req); > + > +void kst_complete_req(struct dst_request *req, int err); > +void kst_bio_endio(struct dst_request *req, int err); > +void kst_del_req(struct dst_request *req); > +int kst_enqueue_req(struct kst_state *st, struct dst_request *req); > + > +int kst_data_callback(struct dst_request *req, unsigned int revents); > + > +extern struct kmem_cache *dst_request_cache; > + > +static inline sector_t to_sector(unsigned long n) > +{ > + return (n >> 9); > +} > + > +static inline unsigned long to_bytes(sector_t n) > +{ > + return (n << 9); > +} > + > +/* > + * Checks state's permissions. > + * Returns -EPERM if check failed. > + */ > +static inline int kst_check_permissions(struct kst_state *st, struct bio *bio) > +{ > + if ((bio_rw(bio) == WRITE) && !(st->permissions & DST_PERM_WRITE)) > + return -EPERM; > + > + return 0; > +} > + > +#endif /* __KERNEL__ */ > +#endif /* __DST_H */ > > -- > Evgeniy Polyakov > - > To unsubscribe from this list: send the line "unsubscribe netdev" in > the body of a message to majordomo@vger.kernel.org > More majordomo info at http://vger.kernel.org/majordomo-info.html - To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/