2008-03-06 01:16:19

by Joel Becker

[permalink] [raw]
Subject: [PATCH 0/10] ocfs2: Userspace cluster stack support

These patches allow ocfs2 to use the distributed lock manager in fs/dlm
in conjunction with a userspace cluster stack.

This series builds on the stack-glue series sent previously. It
provides a plug-in for userspace cluster stacks and fs/dlm. Userspace
is responsible for communicating with the cluster stack and feeding
events through the ocfs2_control misc device. ocfs2 is otherwise
agnostic, using the DLM for all of its interaction.

Using the new ocfs2_stack_user plug-in requires a development version of
the ocfs2 tools. This version knows how to load the plug-in and
provides a daemon to interact with the cman cluster stack.

The second to last patch enables the plug-in in Kbuild. The last patch
adds Kconfig options to select which ocfs2 cluster plug-ins get built.

The kernel code is also available on the 'stack-user' branch of my git
repository.

View:
http://oss.oracle.com/git/?p=jlbec/linux-2.6.git;a=shortlog;h=stack-user
Pull:
git pull git://oss.oracle.com/git/jlbec/linux-2.6.git stack-user

The tools code is also available via git, in the 'stack-user' branch
as well.

View:
http://oss.oracle.com/git/?p=ocfs2-tools.git;a=shortlog;h=stack-user
Pull:
git pull git://oss.oracle.com/git/ocfs2-tools.git stack-user


2008-03-06 01:15:24

by Joel Becker

[permalink] [raw]
Subject: [PATCH 08/10] ocfs2: Change mlog_bug_on to BUG_ON in ocfs2_lockid.h

The masklog code is in the o2cb stack, but ocfs2_lockid.h now needs to
be included by the user stack. The BUG() in ocfs2_lock_type_string()
does not need masklog support, so change it to a regular BUG_ON().

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/ocfs2_lockid.h | 2 +-
1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/fs/ocfs2/ocfs2_lockid.h b/fs/ocfs2/ocfs2_lockid.h
index 86f3e37..82c200f 100644
--- a/fs/ocfs2/ocfs2_lockid.h
+++ b/fs/ocfs2/ocfs2_lockid.h
@@ -100,7 +100,7 @@ static char *ocfs2_lock_type_strings[] = {
static inline const char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
{
#ifdef __KERNEL__
- mlog_bug_on_msg(type >= OCFS2_NUM_LOCK_TYPES, "%d\n", type);
+ BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
#endif
return ocfs2_lock_type_strings[type];
}
--
1.5.3.8

2008-03-06 01:16:57

by Joel Becker

[permalink] [raw]
Subject: [PATCH 05/10] ocfs2: Add the local node id to the handshake.

This is the second part of the ocfs2_control handshake. After
negotiating the ocfs2_control protocol, the daemon tells the filesystem
what the local node id is via the SETN message.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/stack_user.c | 222 ++++++++++++++++++++++++++++++++++++++-----------
1 files changed, 173 insertions(+), 49 deletions(-)

diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index a5e58e2..43e6105 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -40,8 +40,18 @@
* unknown, -EINVAL is returned. Once the negotiation is complete, the
* client can start sending messages.
*
- * The T01 protocol only has one message, "DOWN". It has the following
- * syntax:
+ * The T01 protocol only has two messages. First is the "SETN" message.
+ * It has the following syntax:
+ *
+ * SETN<space><8-char-hex-nodenum><newline>
+ *
+ * This is 14 characters.
+ *
+ * The "SETN" message must be the first message following the protocol.
+ * It tells ocfs2_control the local node number.
+ *
+ * Once the local node number has been set, the "DOWN" message can be
+ * sent for node down notification. It has the following syntax:
*
* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
*
@@ -58,11 +68,18 @@
*/
#define OCFS2_CONTROL_PROTO "T01\n"
#define OCFS2_CONTROL_PROTO_LEN 4
+
+/* Handshake states */
#define OCFS2_CONTROL_HANDSHAKE_INVALID (0)
#define OCFS2_CONTROL_HANDSHAKE_READ (1)
-#define OCFS2_CONTROL_HANDSHAKE_VALID (2)
-#define OCFS2_CONTROL_MESSAGE_DOWN "DOWN"
-#define OCFS2_CONTROL_MESSAGE_DOWN_LEN 4
+#define OCFS2_CONTROL_HANDSHAKE_PROTOCOL (2)
+#define OCFS2_CONTROL_HANDSHAKE_VALID (3)
+
+/* Messages */
+#define OCFS2_CONTROL_MESSAGE_OP_LEN 4
+#define OCFS2_CONTROL_MESSAGE_SETNODE_OP "SETN"
+#define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14
+#define OCFS2_CONTROL_MESSAGE_DOWN_OP "DOWN"
#define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN 47
#define OCFS2_TEXT_UUID_LEN 32
#define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8
@@ -79,9 +96,35 @@ struct ocfs2_live_connection {
struct ocfs2_control_private {
struct list_head op_list;
int op_state;
+ int op_this_node;
+};
+
+/* SETN<space><8-char-hex-nodenum><newline> */
+struct ocfs2_control_message_setn {
+ char tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
+ char space;
+ char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
+ char newline;
+};
+
+/* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
+struct ocfs2_control_message_down {
+ char tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
+ char space1;
+ char uuid[OCFS2_TEXT_UUID_LEN];
+ char space2;
+ char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
+ char newline;
+};
+
+union ocfs2_control_message {
+ char tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
+ struct ocfs2_control_message_setn u_setn;
+ struct ocfs2_control_message_down u_down;
};

static atomic_t ocfs2_control_opened;
+static int ocfs2_control_this_node = -1;

static LIST_HEAD(ocfs2_live_connection_list);
static LIST_HEAD(ocfs2_control_private_list);
@@ -166,38 +209,37 @@ static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
kfree(c);
}

-static ssize_t ocfs2_control_cfu(void *target, size_t target_len,
- const char __user *buf, size_t count)
+static int ocfs2_control_cfu(void *target, size_t target_len,
+ const char __user *buf, size_t count)
{
/* The T01 expects write(2) calls to have exactly one command */
- if (count != target_len)
+ if ((count != target_len) ||
+ (count > sizeof(union ocfs2_control_message)))
return -EINVAL;

if (copy_from_user(target, buf, target_len))
return -EFAULT;

- return count;
+ return 0;
}

-static ssize_t ocfs2_control_validate_handshake(struct file *file,
- const char __user *buf,
- size_t count)
+static ssize_t ocfs2_control_validate_protocol(struct file *file,
+ const char __user *buf,
+ size_t count)
{
ssize_t ret;
char kbuf[OCFS2_CONTROL_PROTO_LEN];

ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
buf, count);
- if (ret != count)
+ if (ret)
return ret;

if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
return -EINVAL;

- atomic_inc(&ocfs2_control_opened);
ocfs2_control_set_handshake_state(file,
- OCFS2_CONTROL_HANDSHAKE_VALID);
-
+ OCFS2_CONTROL_HANDSHAKE_PROTOCOL);

return count;
}
@@ -219,45 +261,92 @@ static void ocfs2_control_send_down(const char *uuid,
mutex_unlock(&ocfs2_control_lock);
}

-/* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
-struct ocfs2_control_message_down {
- char tag[OCFS2_CONTROL_MESSAGE_DOWN_LEN];
- char space1;
- char uuid[OCFS2_TEXT_UUID_LEN];
- char space2;
- char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
- char newline;
-};
+/*
+ * Called whenever configuration elements are sent to /dev/ocfs2_control.
+ * If all configuration elements are present, try to set the global
+ * values. If not, return -EAGAIN. If there is a problem, return a
+ * different error.
+ */
+static int ocfs2_control_install_private(struct file *file)
+{
+ int rc = 0;
+ int set_p = 1;
+ struct ocfs2_control_private *p = file->private_data;

-static ssize_t ocfs2_control_message(struct file *file,
- const char __user *buf,
- size_t count)
+ BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
+
+ if (p->op_this_node < 0)
+ set_p = 0;
+
+ mutex_lock(&ocfs2_control_lock);
+ if (ocfs2_control_this_node < 0) {
+ if (set_p)
+ ocfs2_control_this_node = p->op_this_node;
+ } else if (ocfs2_control_this_node != p->op_this_node)
+ rc = -EINVAL;
+ mutex_unlock(&ocfs2_control_lock);
+
+ if (!rc && set_p) {
+ /* We set the global values successfully */
+ atomic_inc(&ocfs2_control_opened);
+ ocfs2_control_set_handshake_state(file,
+ OCFS2_CONTROL_HANDSHAKE_VALID);
+ }
+
+ return rc;
+}
+
+static int ocfs2_control_do_setnode_msg(struct file *file,
+ struct ocfs2_control_message_setn *msg)
{
- ssize_t ret;
- char *p = NULL;
long nodenum;
- struct ocfs2_control_message_down msg;
+ char *ptr = NULL;
+ struct ocfs2_control_private *p = file->private_data;

- /* Try to catch padding issues */
- WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
- (sizeof(msg.tag) + sizeof(msg.space1)));
+ if (ocfs2_control_get_handshake_state(file) !=
+ OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
+ return -EINVAL;

- memset(&msg, 0, sizeof(struct ocfs2_control_message_down));
- ret = ocfs2_control_cfu(&msg, OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN,
- buf, count);
- if (ret != count)
- return ret;
+ if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
+ OCFS2_CONTROL_MESSAGE_OP_LEN))
+ return -EINVAL;
+
+ if ((msg->space != ' ') || (msg->newline != '\n'))
+ return -EINVAL;
+ msg->space = msg->newline = '\0';
+
+ nodenum = simple_strtol(msg->nodestr, &ptr, 16);
+ if (!ptr || *ptr)
+ return -EINVAL;
+
+ if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
+ (nodenum > INT_MAX) || (nodenum < 0))
+ return -ERANGE;
+ p->op_this_node = nodenum;
+
+ return ocfs2_control_install_private(file);
+}
+
+static int ocfs2_control_do_down_msg(struct file *file,
+ struct ocfs2_control_message_down *msg)
+{
+ long nodenum;
+ char *p = NULL;
+
+ if (ocfs2_control_get_handshake_state(file) !=
+ OCFS2_CONTROL_HANDSHAKE_VALID)
+ return -EINVAL;

- if (strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN,
- strlen(OCFS2_CONTROL_MESSAGE_DOWN)))
+ if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
+ OCFS2_CONTROL_MESSAGE_OP_LEN))
return -EINVAL;

- if ((msg.space1 != ' ') || (msg.space2 != ' ') ||
- (msg.newline != '\n'))
+ if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
+ (msg->newline != '\n'))
return -EINVAL;
- msg.space1 = msg.space2 = msg.newline = '\0';
+ msg->space1 = msg->space2 = msg->newline = '\0';

- nodenum = simple_strtol(msg.nodestr, &p, 16);
+ nodenum = simple_strtol(msg->nodestr, &p, 16);
if (!p || *p)
return -EINVAL;

@@ -265,9 +354,40 @@ static ssize_t ocfs2_control_message(struct file *file,
(nodenum > INT_MAX) || (nodenum < 0))
return -ERANGE;

- ocfs2_control_send_down(msg.uuid, nodenum);
+ ocfs2_control_send_down(msg->uuid, nodenum);

- return count;
+ return 0;
+}
+
+static ssize_t ocfs2_control_message(struct file *file,
+ const char __user *buf,
+ size_t count)
+{
+ ssize_t ret;
+ union ocfs2_control_message msg;
+
+ /* Try to catch padding issues */
+ WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
+ (sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1)));
+
+ memset(&msg, 0, sizeof(union ocfs2_control_message));
+ ret = ocfs2_control_cfu(&msg, count, buf, count);
+ if (ret)
+ goto out;
+
+ if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) &&
+ !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
+ OCFS2_CONTROL_MESSAGE_OP_LEN))
+ ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
+ else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
+ !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
+ OCFS2_CONTROL_MESSAGE_OP_LEN))
+ ret = ocfs2_control_do_down_msg(file, &msg.u_down);
+ else
+ ret = -EINVAL;
+
+out:
+ return ret ? ret : count;
}

static ssize_t ocfs2_control_write(struct file *file,
@@ -283,10 +403,11 @@ static ssize_t ocfs2_control_write(struct file *file,
break;

case OCFS2_CONTROL_HANDSHAKE_READ:
- ret = ocfs2_control_validate_handshake(file, buf,
- count);
+ ret = ocfs2_control_validate_protocol(file, buf,
+ count);
break;

+ case OCFS2_CONTROL_HANDSHAKE_PROTOCOL:
case OCFS2_CONTROL_HANDSHAKE_VALID:
ret = ocfs2_control_message(file, buf, count);
break;
@@ -350,6 +471,8 @@ static int ocfs2_control_release(struct inode *inode, struct file *file)
"an emergency restart!\n");
emergency_restart();
}
+ /* Last valid close clears the node number */
+ ocfs2_control_this_node = -1;
}

out:
@@ -370,6 +493,7 @@ static int ocfs2_control_open(struct inode *inode, struct file *file)
p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL);
if (!p)
return -ENOMEM;
+ p->op_this_node = -1;

mutex_lock(&ocfs2_control_lock);
file->private_data = p;
--
1.5.3.8

2008-03-06 01:17:41

by Joel Becker

[permalink] [raw]
Subject: [PATCH 03/10] ocfs2: Start the ocfs2_control handshake.

When a control daemon opens the ocfs2_control device, it must perform a
handshake to tell the filesystem it is something capable of monitoring
cluster status. Only after the handshake is complete will the filesystem
allow mounts.

This is the first part of the handshake. The daemon reads all supported
ocfs2_control protocols, then writes in the protocol it will use.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/stack_user.c | 144 +++++++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 139 insertions(+), 5 deletions(-)

diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index fdca5d3..ff8d307 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -22,6 +22,7 @@
#include <linux/miscdevice.h>
#include <linux/mutex.h>
#include <linux/reboot.h>
+#include <asm/uaccess.h>

#include "stackglue.h"

@@ -40,6 +41,16 @@
*/

/*
+ * Whether or not the client has done the handshake.
+ * For now, we have just one protocol version.
+ */
+#define OCFS2_CONTROL_PROTO "T01\n"
+#define OCFS2_CONTROL_PROTO_LEN 4
+#define OCFS2_CONTROL_HANDSHAKE_INVALID (0)
+#define OCFS2_CONTROL_HANDSHAKE_READ (1)
+#define OCFS2_CONTROL_HANDSHAKE_VALID (2)
+
+/*
* ocfs2_live_connection is refcounted because the filesystem and
* miscdevice sides can detach in different order. Let's just be safe.
*/
@@ -48,11 +59,30 @@ struct ocfs2_live_connection {
struct ocfs2_cluster_connection *oc_conn;
};

+struct ocfs2_control_private {
+ struct list_head op_list;
+ int op_state;
+};
+
static atomic_t ocfs2_control_opened;

static LIST_HEAD(ocfs2_live_connection_list);
+static LIST_HEAD(ocfs2_control_private_list);
static DEFINE_MUTEX(ocfs2_control_lock);

+static inline void ocfs2_control_set_handshake_state(struct file *file,
+ int state)
+{
+ struct ocfs2_control_private *p = file->private_data;
+ p->op_state = state;
+}
+
+static inline int ocfs2_control_get_handshake_state(struct file *file)
+{
+ struct ocfs2_control_private *p = file->private_data;
+ return p->op_state;
+}
+
static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
{
size_t len = strlen(name);
@@ -119,27 +149,115 @@ static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
kfree(c);
}

+static ssize_t ocfs2_control_cfu(char *target, size_t target_len,
+ const char __user *buf, size_t count)
+{
+ /* The T01 expects write(2) calls to have exactly one command */
+ if (count != target_len)
+ return -EINVAL;
+
+ if (copy_from_user(target, buf, target_len))
+ return -EFAULT;
+
+ return count;
+}
+
+static ssize_t ocfs2_control_validate_handshake(struct file *file,
+ const char __user *buf,
+ size_t count)
+{
+ ssize_t ret;
+ char kbuf[OCFS2_CONTROL_PROTO_LEN];
+
+ ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
+ buf, count);
+ if (ret != count)
+ return ret;
+
+ if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
+ return -EINVAL;
+
+ atomic_inc(&ocfs2_control_opened);
+ ocfs2_control_set_handshake_state(file,
+ OCFS2_CONTROL_HANDSHAKE_VALID);
+
+
+ return count;
+}
+

static ssize_t ocfs2_control_write(struct file *file,
const char __user *buf,
size_t count,
loff_t *ppos)
{
- return 0;
+ ssize_t ret;
+
+ switch (ocfs2_control_get_handshake_state(file)) {
+ case OCFS2_CONTROL_HANDSHAKE_INVALID:
+ ret = -EINVAL;
+ break;
+
+ case OCFS2_CONTROL_HANDSHAKE_READ:
+ ret = ocfs2_control_validate_handshake(file, buf,
+ count);
+ break;
+
+ case OCFS2_CONTROL_HANDSHAKE_VALID:
+ ret = count; /* XXX */
+ break;
+
+ default:
+ BUG();
+ ret = -EIO;
+ break;
+ }
+
+ return ret;
}

+/*
+ * This is a naive version. If we ever have a new protocol, we'll expand
+ * it. Probably using seq_file.
+ */
static ssize_t ocfs2_control_read(struct file *file,
char __user *buf,
size_t count,
loff_t *ppos)
{
- return 0;
+ char *proto_string = OCFS2_CONTROL_PROTO;
+ size_t to_write = 0;
+
+ if (*ppos >= OCFS2_CONTROL_PROTO_LEN)
+ return 0;
+
+ to_write = OCFS2_CONTROL_PROTO_LEN - *ppos;
+ if (to_write > count)
+ to_write = count;
+ if (copy_to_user(buf, proto_string + *ppos, to_write))
+ return -EFAULT;
+
+ *ppos += to_write;
+
+ /* Have we read the whole protocol list? */
+ if (*ppos >= OCFS2_CONTROL_PROTO_LEN)
+ ocfs2_control_set_handshake_state(file,
+ OCFS2_CONTROL_HANDSHAKE_READ);
+
+ return to_write;
}

static int ocfs2_control_release(struct inode *inode, struct file *file)
{
+ struct ocfs2_control_private *p = file->private_data;
+
+ mutex_lock(&ocfs2_control_lock);
+
+ if (ocfs2_control_get_handshake_state(file) !=
+ OCFS2_CONTROL_HANDSHAKE_VALID)
+ goto out;
+
if (atomic_dec_and_test(&ocfs2_control_opened)) {
- mutex_lock(&ocfs2_control_lock);
if (!list_empty(&ocfs2_live_connection_list)) {
/* XXX: Do bad things! */
printk(KERN_ERR
@@ -148,15 +266,31 @@ static int ocfs2_control_release(struct inode *inode, struct file *file)
"an emergency restart!\n");
emergency_restart();
}
- mutex_unlock(&ocfs2_control_lock);
}

+out:
+ list_del_init(&p->op_list);
+ file->private_data = NULL;
+
+ mutex_unlock(&ocfs2_control_lock);
+
+ kfree(p);
+
return 0;
}

static int ocfs2_control_open(struct inode *inode, struct file *file)
{
- atomic_inc(&ocfs2_control_opened);
+ struct ocfs2_control_private *p;
+
+ p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL);
+ if (!p)
+ return -ENOMEM;
+
+ mutex_lock(&ocfs2_control_lock);
+ file->private_data = p;
+ list_add(&p->op_list, &ocfs2_control_private_list);
+ mutex_unlock(&ocfs2_control_lock);

return 0;
}
--
1.5.3.8

2008-03-06 01:18:12

by Joel Becker

[permalink] [raw]
Subject: [PATCH 01/10] ocfs2: Add the user stack module.

Add a skeleton for the stack_user module. It's just the barebones module
code.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/stack_user.c | 38 ++++++++++++++++++++++++++++++++++++++
1 files changed, 38 insertions(+), 0 deletions(-)
create mode 100644 fs/ocfs2/stack_user.c

diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
new file mode 100644
index 0000000..920eb11
--- /dev/null
+++ b/fs/ocfs2/stack_user.c
@@ -0,0 +1,38 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * stack_user.c
+ *
+ * Code which interfaces ocfs2 with fs/dlm and a userspace stack.
+ *
+ * Copyright (C) 2007 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation, version 2.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ */
+
+#include <linux/module.h>
+
+#include "stackglue.h"
+
+
+static int __init user_stack_init(void)
+{
+ return 0;
+}
+
+static void __exit user_stack_exit(void)
+{
+}
+
+MODULE_AUTHOR("Oracle");
+MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks");
+MODULE_LICENSE("GPL");
+module_init(user_stack_init);
+module_exit(user_stack_exit);
--
1.5.3.8

2008-03-06 01:18:44

by Joel Becker

[permalink] [raw]
Subject: [PATCH 09/10] ocfs2: Add kbuild for ocfs2_stack_user.ko

Add ocfs2_stack_user.ko to the Makefile so that it builds.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/Makefile | 7 ++++++-
1 files changed, 6 insertions(+), 1 deletions(-)

diff --git a/fs/ocfs2/Makefile b/fs/ocfs2/Makefile
index b734254..b8d6d02 100644
--- a/fs/ocfs2/Makefile
+++ b/fs/ocfs2/Makefile
@@ -2,7 +2,11 @@ EXTRA_CFLAGS += -Ifs/ocfs2

EXTRA_CFLAGS += -DCATCH_BH_JBD_RACES

-obj-$(CONFIG_OCFS2_FS) += ocfs2.o ocfs2_stackglue.o ocfs2_stack_o2cb.o
+obj-$(CONFIG_OCFS2_FS) += \
+ ocfs2.o \
+ ocfs2_stackglue.o \
+ ocfs2_stack_o2cb.o \
+ ocfs2_stack_user.o

ocfs2-objs := \
alloc.o \
@@ -33,6 +37,7 @@ ocfs2-objs := \

ocfs2_stackglue-objs := stackglue.o
ocfs2_stack_o2cb-objs := stack_o2cb.o
+ocfs2_stack_user-objs := stack_user.o

obj-$(CONFIG_OCFS2_FS) += cluster/
obj-$(CONFIG_OCFS2_FS) += dlm/
--
1.5.3.8

2008-03-06 01:19:33

by Joel Becker

[permalink] [raw]
Subject: [PATCH 04/10] ocfs2: Introduce the DOWN message to ocfs2_control

When the control daemon sees a node go down, it sends a DOWN message
through the ocfs2_control device.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/stack_user.c | 94 ++++++++++++++++++++++++++++++++++++++++++++++---
1 files changed, 89 insertions(+), 5 deletions(-)

diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index ff8d307..a5e58e2 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -35,9 +35,21 @@
* of output is a supported protocol tag. All protocol tags are a single
* character followed by a two hex digit version number. Currently the
* only things supported is T01, for "Text-base version 0x01". Next, the
- * client writes the version they would like to use. If the version tag
- * written is unknown, -EINVAL is returned. Once the negotiation is
- * complete, the client can start sending messages.
+ * client writes the version they would like to use, including the newline.
+ * Thus, the protocol tag is 'T01\n'. If the version tag written is
+ * unknown, -EINVAL is returned. Once the negotiation is complete, the
+ * client can start sending messages.
+ *
+ * The T01 protocol only has one message, "DOWN". It has the following
+ * syntax:
+ *
+ * DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
+ *
+ * eg:
+ *
+ * DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n
+ *
+ * This is 47 characters.
*/

/*
@@ -49,6 +61,11 @@
#define OCFS2_CONTROL_HANDSHAKE_INVALID (0)
#define OCFS2_CONTROL_HANDSHAKE_READ (1)
#define OCFS2_CONTROL_HANDSHAKE_VALID (2)
+#define OCFS2_CONTROL_MESSAGE_DOWN "DOWN"
+#define OCFS2_CONTROL_MESSAGE_DOWN_LEN 4
+#define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN 47
+#define OCFS2_TEXT_UUID_LEN 32
+#define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8

/*
* ocfs2_live_connection is refcounted because the filesystem and
@@ -149,7 +166,7 @@ static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
kfree(c);
}

-static ssize_t ocfs2_control_cfu(char *target, size_t target_len,
+static ssize_t ocfs2_control_cfu(void *target, size_t target_len,
const char __user *buf, size_t count)
{
/* The T01 expects write(2) calls to have exactly one command */
@@ -185,6 +202,73 @@ static ssize_t ocfs2_control_validate_handshake(struct file *file,
return count;
}

+static void ocfs2_control_send_down(const char *uuid,
+ int nodenum)
+{
+ struct ocfs2_live_connection *c;
+
+ mutex_lock(&ocfs2_control_lock);
+
+ c = ocfs2_connection_find(uuid);
+ if (c) {
+ BUG_ON(c->oc_conn == NULL);
+ c->oc_conn->cc_recovery_handler(nodenum,
+ c->oc_conn->cc_recovery_data);
+ }
+
+ mutex_unlock(&ocfs2_control_lock);
+}
+
+/* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
+struct ocfs2_control_message_down {
+ char tag[OCFS2_CONTROL_MESSAGE_DOWN_LEN];
+ char space1;
+ char uuid[OCFS2_TEXT_UUID_LEN];
+ char space2;
+ char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
+ char newline;
+};
+
+static ssize_t ocfs2_control_message(struct file *file,
+ const char __user *buf,
+ size_t count)
+{
+ ssize_t ret;
+ char *p = NULL;
+ long nodenum;
+ struct ocfs2_control_message_down msg;
+
+ /* Try to catch padding issues */
+ WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
+ (sizeof(msg.tag) + sizeof(msg.space1)));
+
+ memset(&msg, 0, sizeof(struct ocfs2_control_message_down));
+ ret = ocfs2_control_cfu(&msg, OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN,
+ buf, count);
+ if (ret != count)
+ return ret;
+
+ if (strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN,
+ strlen(OCFS2_CONTROL_MESSAGE_DOWN)))
+ return -EINVAL;
+
+ if ((msg.space1 != ' ') || (msg.space2 != ' ') ||
+ (msg.newline != '\n'))
+ return -EINVAL;
+ msg.space1 = msg.space2 = msg.newline = '\0';
+
+ nodenum = simple_strtol(msg.nodestr, &p, 16);
+ if (!p || *p)
+ return -EINVAL;
+
+ if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
+ (nodenum > INT_MAX) || (nodenum < 0))
+ return -ERANGE;
+
+ ocfs2_control_send_down(msg.uuid, nodenum);
+
+ return count;
+}

static ssize_t ocfs2_control_write(struct file *file,
const char __user *buf,
@@ -204,7 +288,7 @@ static ssize_t ocfs2_control_write(struct file *file,
break;

case OCFS2_CONTROL_HANDSHAKE_VALID:
- ret = count; /* XXX */
+ ret = ocfs2_control_message(file, buf, count);
break;

default:
--
1.5.3.8

2008-03-06 01:20:00

by Joel Becker

[permalink] [raw]
Subject: [PATCH 02/10] ocfs2: Add the ocfs2_control misc device.

The ocfs2_control misc device is how a userspace control daemon (controld)
talks to the filesystem. Introduce the bare-bones filesystem ops.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/stack_user.c | 184 ++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 183 insertions(+), 1 deletions(-)

diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index 920eb11..fdca5d3 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -18,17 +18,199 @@
*/

#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/miscdevice.h>
+#include <linux/mutex.h>
+#include <linux/reboot.h>

#include "stackglue.h"


-static int __init user_stack_init(void)
+/*
+ * The control protocol starts with a handshake. Until the handshake
+ * is complete, the control device will fail all write(2)s.
+ *
+ * The handshake is simple. First, the client reads until EOF. Each line
+ * of output is a supported protocol tag. All protocol tags are a single
+ * character followed by a two hex digit version number. Currently the
+ * only things supported is T01, for "Text-base version 0x01". Next, the
+ * client writes the version they would like to use. If the version tag
+ * written is unknown, -EINVAL is returned. Once the negotiation is
+ * complete, the client can start sending messages.
+ */
+
+/*
+ * ocfs2_live_connection is refcounted because the filesystem and
+ * miscdevice sides can detach in different order. Let's just be safe.
+ */
+struct ocfs2_live_connection {
+ struct list_head oc_list;
+ struct ocfs2_cluster_connection *oc_conn;
+};
+
+static atomic_t ocfs2_control_opened;
+
+static LIST_HEAD(ocfs2_live_connection_list);
+static DEFINE_MUTEX(ocfs2_control_lock);
+
+static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
+{
+ size_t len = strlen(name);
+ struct ocfs2_live_connection *c;
+
+ BUG_ON(!mutex_is_locked(&ocfs2_control_lock));
+
+ list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) {
+ if ((c->oc_conn->cc_namelen == len) &&
+ !strncmp(c->oc_conn->cc_name, name, len))
+ return c;
+ }
+
+ return c;
+}
+
+/*
+ * ocfs2_live_connection structures are created underneath the ocfs2
+ * mount path. Since the VFS prevents multiple calls to
+ * fill_super(), we can't get dupes here.
+ */
+static int ocfs2_live_connection_new(struct ocfs2_cluster_connection *conn,
+ struct ocfs2_live_connection **c_ret)
+{
+ int rc = 0;
+ struct ocfs2_live_connection *c;
+
+ c = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
+ if (!c)
+ return -ENOMEM;
+
+ mutex_lock(&ocfs2_control_lock);
+ c->oc_conn = conn;
+
+ if (atomic_read(&ocfs2_control_opened))
+ list_add(&c->oc_list, &ocfs2_live_connection_list);
+ else {
+ printk(KERN_ERR
+ "ocfs2: Userspace control daemon is not present\n");
+ rc = -ESRCH;
+ }
+
+ mutex_unlock(&ocfs2_control_lock);
+
+ if (!rc)
+ *c_ret = c;
+ else
+ kfree(c);
+
+ return rc;
+}
+
+/*
+ * This function disconnects the cluster connection from ocfs2_control.
+ * Afterwards, userspace can't affect the cluster connection.
+ */
+static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
+{
+ mutex_lock(&ocfs2_control_lock);
+ list_del_init(&c->oc_list);
+ c->oc_conn = NULL;
+ mutex_unlock(&ocfs2_control_lock);
+
+ kfree(c);
+}
+
+
+static ssize_t ocfs2_control_write(struct file *file,
+ const char __user *buf,
+ size_t count,
+ loff_t *ppos)
{
return 0;
}

+static ssize_t ocfs2_control_read(struct file *file,
+ char __user *buf,
+ size_t count,
+ loff_t *ppos)
+{
+ return 0;
+}
+
+static int ocfs2_control_release(struct inode *inode, struct file *file)
+{
+ if (atomic_dec_and_test(&ocfs2_control_opened)) {
+ mutex_lock(&ocfs2_control_lock);
+ if (!list_empty(&ocfs2_live_connection_list)) {
+ /* XXX: Do bad things! */
+ printk(KERN_ERR
+ "ocfs2: Unexpected release of ocfs2_control!\n"
+ " Loss of cluster connection requires "
+ "an emergency restart!\n");
+ emergency_restart();
+ }
+ mutex_unlock(&ocfs2_control_lock);
+ }
+
+ return 0;
+}
+
+static int ocfs2_control_open(struct inode *inode, struct file *file)
+{
+ atomic_inc(&ocfs2_control_opened);
+
+ return 0;
+}
+
+static const struct file_operations ocfs2_control_fops = {
+ .open = ocfs2_control_open,
+ .release = ocfs2_control_release,
+ .read = ocfs2_control_read,
+ .write = ocfs2_control_write,
+ .owner = THIS_MODULE,
+};
+
+struct miscdevice ocfs2_control_device = {
+ .minor = MISC_DYNAMIC_MINOR,
+ .name = "ocfs2_control",
+ .fops = &ocfs2_control_fops,
+};
+
+static int ocfs2_control_init(void)
+{
+ int rc;
+
+ atomic_set(&ocfs2_control_opened, 0);
+
+ rc = misc_register(&ocfs2_control_device);
+ if (rc)
+ printk(KERN_ERR
+ "ocfs2: Unable to register ocfs2_control device "
+ "(errno %d)\n",
+ -rc);
+
+ return rc;
+}
+
+static void ocfs2_control_exit(void)
+{
+ int rc;
+
+ rc = misc_deregister(&ocfs2_control_device);
+ if (rc)
+ printk(KERN_ERR
+ "ocfs2: Unable to deregister ocfs2_control device "
+ "(errno %d)\n",
+ -rc);
+}
+
+static int __init user_stack_init(void)
+{
+ return ocfs2_control_init();
+}
+
static void __exit user_stack_exit(void)
{
+ ocfs2_control_exit();
}

MODULE_AUTHOR("Oracle");
--
1.5.3.8

2008-03-06 01:20:46

by Joel Becker

[permalink] [raw]
Subject: [PATCH 06/10] ocfs2: Add the 'set version' message to the ocfs2_control device.

The "SETV" message sets the filesystem locking protocol version as
negotiated by the client. The client negotiates based on the maximum
version advertised in /sys/fs/ocfs2/max_locking_protocol.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/stack_user.c | 131 ++++++++++++++++++++++++++++++++++++++++++++-----
1 files changed, 119 insertions(+), 12 deletions(-)

diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index 43e6105..9faa678 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -40,7 +40,7 @@
* unknown, -EINVAL is returned. Once the negotiation is complete, the
* client can start sending messages.
*
- * The T01 protocol only has two messages. First is the "SETN" message.
+ * The T01 protocol has three messages. First is the "SETN" message.
* It has the following syntax:
*
* SETN<space><8-char-hex-nodenum><newline>
@@ -50,8 +50,22 @@
* The "SETN" message must be the first message following the protocol.
* It tells ocfs2_control the local node number.
*
- * Once the local node number has been set, the "DOWN" message can be
- * sent for node down notification. It has the following syntax:
+ * Next comes the "SETV" message. It has the following syntax:
+ *
+ * SETV<space><2-char-hex-major><space><2-char-hex-minor><newline>
+ *
+ * This is 11 characters.
+ *
+ * The "SETV" message sets the filesystem locking protocol version as
+ * negotiated by the client. The client negotiates based on the maximum
+ * version advertised in /sys/fs/ocfs2/max_locking_protocol. The major
+ * number from the "SETV" message must match
+ * user_stack.sp_proto->lp_max_version.pv_major, and the minor number
+ * must be less than or equal to ...->lp_max_version.pv_minor.
+ *
+ * Once this information has been set, mounts will be allowed. From this
+ * point on, the "DOWN" message can be * sent for node down notification.
+ * It has the following syntax:
*
* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
*
@@ -79,9 +93,12 @@
#define OCFS2_CONTROL_MESSAGE_OP_LEN 4
#define OCFS2_CONTROL_MESSAGE_SETNODE_OP "SETN"
#define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14
+#define OCFS2_CONTROL_MESSAGE_SETVERSION_OP "SETV"
+#define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN 11
#define OCFS2_CONTROL_MESSAGE_DOWN_OP "DOWN"
#define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN 47
#define OCFS2_TEXT_UUID_LEN 32
+#define OCFS2_CONTROL_MESSAGE_VERNUM_LEN 2
#define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8

/*
@@ -97,6 +114,7 @@ struct ocfs2_control_private {
struct list_head op_list;
int op_state;
int op_this_node;
+ struct ocfs2_protocol_version op_proto;
};

/* SETN<space><8-char-hex-nodenum><newline> */
@@ -107,6 +125,16 @@ struct ocfs2_control_message_setn {
char newline;
};

+/* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */
+struct ocfs2_control_message_setv {
+ char tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
+ char space1;
+ char major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
+ char space2;
+ char minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
+ char newline;
+};
+
/* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
struct ocfs2_control_message_down {
char tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
@@ -120,11 +148,13 @@ struct ocfs2_control_message_down {
union ocfs2_control_message {
char tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
struct ocfs2_control_message_setn u_setn;
+ struct ocfs2_control_message_setv u_setv;
struct ocfs2_control_message_down u_down;
};

static atomic_t ocfs2_control_opened;
static int ocfs2_control_this_node = -1;
+static struct ocfs2_protocol_version running_proto;

static LIST_HEAD(ocfs2_live_connection_list);
static LIST_HEAD(ocfs2_control_private_list);
@@ -264,8 +294,9 @@ static void ocfs2_control_send_down(const char *uuid,
/*
* Called whenever configuration elements are sent to /dev/ocfs2_control.
* If all configuration elements are present, try to set the global
- * values. If not, return -EAGAIN. If there is a problem, return a
- * different error.
+ * values. If there is a problem, return an error. Skip any missing
+ * elements, and only bump ocfs2_control_opened when we have all elements
+ * and are successful.
*/
static int ocfs2_control_install_private(struct file *file)
{
@@ -275,15 +306,32 @@ static int ocfs2_control_install_private(struct file *file)

BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);

- if (p->op_this_node < 0)
+ mutex_lock(&ocfs2_control_lock);
+
+ if (p->op_this_node < 0) {
set_p = 0;
+ } else if ((ocfs2_control_this_node >= 0) &&
+ (ocfs2_control_this_node != p->op_this_node)) {
+ rc = -EINVAL;
+ goto out_unlock;
+ }

- mutex_lock(&ocfs2_control_lock);
- if (ocfs2_control_this_node < 0) {
- if (set_p)
- ocfs2_control_this_node = p->op_this_node;
- } else if (ocfs2_control_this_node != p->op_this_node)
+ if (!p->op_proto.pv_major) {
+ set_p = 0;
+ } else if (!list_empty(&ocfs2_live_connection_list) &&
+ ((running_proto.pv_major != p->op_proto.pv_major) ||
+ (running_proto.pv_minor != p->op_proto.pv_minor))) {
rc = -EINVAL;
+ goto out_unlock;
+ }
+
+ if (set_p) {
+ ocfs2_control_this_node = p->op_this_node;
+ running_proto.pv_major = p->op_proto.pv_major;
+ running_proto.pv_minor = p->op_proto.pv_minor;
+ }
+
+out_unlock:
mutex_unlock(&ocfs2_control_lock);

if (!rc && set_p) {
@@ -327,6 +375,56 @@ static int ocfs2_control_do_setnode_msg(struct file *file,
return ocfs2_control_install_private(file);
}

+static int ocfs2_control_do_setversion_msg(struct file *file,
+ struct ocfs2_control_message_setv *msg)
+ {
+ long major, minor;
+ char *ptr = NULL;
+ struct ocfs2_control_private *p = file->private_data;
+ struct ocfs2_protocol_version *max =
+ &user_stack.sp_proto->lp_max_version;
+
+ if (ocfs2_control_get_handshake_state(file) !=
+ OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
+ return -EINVAL;
+
+ if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
+ OCFS2_CONTROL_MESSAGE_OP_LEN))
+ return -EINVAL;
+
+ if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
+ (msg->newline != '\n'))
+ return -EINVAL;
+ msg->space1 = msg->space2 = msg->newline = '\0';
+
+ major = simple_strtol(msg->major, &ptr, 16);
+ if (!ptr || *ptr)
+ return -EINVAL;
+ minor = simple_strtol(msg->minor, &ptr, 16);
+ if (!ptr || *ptr)
+ return -EINVAL;
+
+ /*
+ * The major must be between 1 and 255, inclusive. The minor
+ * must be between 0 and 255, inclusive. The version passed in
+ * must be within the maximum version supported by the filesystem.
+ */
+ if ((major == LONG_MIN) || (major == LONG_MAX) ||
+ (major > (u8)-1) || (major < 1))
+ return -ERANGE;
+ if ((minor == LONG_MIN) || (minor == LONG_MAX) ||
+ (minor > (u8)-1) || (minor < 0))
+ return -ERANGE;
+ if ((major != max->pv_major) ||
+ (minor > max->pv_minor))
+ return -EINVAL;
+
+ p->op_proto.pv_major = major;
+ p->op_proto.pv_minor = minor;
+
+ return ocfs2_control_install_private(file);
+}
+
static int ocfs2_control_do_down_msg(struct file *file,
struct ocfs2_control_message_down *msg)
{
@@ -379,6 +477,10 @@ static ssize_t ocfs2_control_message(struct file *file,
!strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
OCFS2_CONTROL_MESSAGE_OP_LEN))
ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
+ else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) &&
+ !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
+ OCFS2_CONTROL_MESSAGE_OP_LEN))
+ ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv);
else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
!strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
OCFS2_CONTROL_MESSAGE_OP_LEN))
@@ -471,8 +573,13 @@ static int ocfs2_control_release(struct inode *inode, struct file *file)
"an emergency restart!\n");
emergency_restart();
}
- /* Last valid close clears the node number */
+ /*
+ * Last valid close clears the node number and resets
+ * the locking protocol version
+ */
ocfs2_control_this_node = -1;
+ running_proto.pv_major = 0;
+ running_proto.pv_major = 0;
}

out:
--
1.5.3.8

2008-03-06 01:21:16

by Joel Becker

[permalink] [raw]
Subject: [PATCH 10/10] ocfs2: Allow selection of cluster plug-ins.

ocfs2 now supports plug-ins for the classic O2CB stack as well as
userspace cluster stacks in conjunction with fs/dlm. This allows zero,
one, or both of the plug-ins to be selected in Kconfig. For local mounts
(non-clustered), neither plug-in is needed. Both plugins can be loaded
at one time, the runtime will select the one needed for the cluster
systme in use.

Signed-off-by: Joel Becker <[email protected]>
---
fs/Kconfig | 26 ++++++++++++++++++++++++++
fs/ocfs2/Makefile | 10 ++++++----
2 files changed, 32 insertions(+), 4 deletions(-)

diff --git a/fs/Kconfig b/fs/Kconfig
index d731282..1b0e775 100644
--- a/fs/Kconfig
+++ b/fs/Kconfig
@@ -444,6 +444,32 @@ config OCFS2_FS
For more information on OCFS2, see the file
<file:Documentation/filesystems/ocfs2.txt>.

+config OCFS2_FS_O2CB
+ tristate "O2CB Kernelspace Clustering"
+ depends on OCFS2_FS
+ default y
+ help
+ OCFS2 includes a simple kernelspace clustering package, the OCFS2
+ Cluster Base. It only requires a very small userspace compontent
+ to configure it. This comes with the standard ocfs2-tools package.
+ O2CB is limited to maintaining a cluster for OCFS2 file systems.
+ It cannot manage any other cluster applications.
+
+ It is always safe to say Y here, as the clustering method is
+ run-time selectable.
+
+config OCFS2_FS_USERSPACE_CLUSTER
+ tristate "OCFS2 Userspace Clustering"
+ depends on OCFS2_FS && DLM
+ default y
+ help
+ This option will allow OCFS2 to use userspace clustering services
+ in conjunction with the DLM in fs/dlm. If you are using a
+ userspace cluster manager, say Y here.
+
+ It is safe to say Y, as the clustering method is run-time
+ selectable.
+
config OCFS2_DEBUG_MASKLOG
bool "OCFS2 logging support"
depends on OCFS2_FS
diff --git a/fs/ocfs2/Makefile b/fs/ocfs2/Makefile
index b8d6d02..f6956de 100644
--- a/fs/ocfs2/Makefile
+++ b/fs/ocfs2/Makefile
@@ -4,9 +4,10 @@ EXTRA_CFLAGS += -DCATCH_BH_JBD_RACES

obj-$(CONFIG_OCFS2_FS) += \
ocfs2.o \
- ocfs2_stackglue.o \
- ocfs2_stack_o2cb.o \
- ocfs2_stack_user.o
+ ocfs2_stackglue.o
+
+obj-$(CONFIG_OCFS2_FS_O2CB) += ocfs2_stack_o2cb.o
+obj-$(CONFIG_OCFS2_FS_USERSPACE_CLUSTER) += ocfs2_stack_user.o

ocfs2-objs := \
alloc.o \
@@ -39,5 +40,6 @@ ocfs2_stackglue-objs := stackglue.o
ocfs2_stack_o2cb-objs := stack_o2cb.o
ocfs2_stack_user-objs := stack_user.o

+# cluster/ is always needed when OCFS2_FS for masklog support
obj-$(CONFIG_OCFS2_FS) += cluster/
-obj-$(CONFIG_OCFS2_FS) += dlm/
+obj-$(CONFIG_OCFS2_FS_O2CB) += dlm/
--
1.5.3.8

2008-03-06 01:21:50

by Joel Becker

[permalink] [raw]
Subject: [PATCH 07/10] ocfs2: add fsdlm to stackglue

From: David Teigland <[email protected]>

Add code to use fs/dlm.

[ Modified to be part of the stack_user module -- Joel ]

Signed-off-by: David Teigland <[email protected]>
Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/stack_user.c | 216 ++++++++++++++++++++++++++++++++++++++++++++++++-
fs/ocfs2/stackglue.c | 14 +++-
fs/ocfs2/stackglue.h | 19 ++++-
3 files changed, 243 insertions(+), 6 deletions(-)

diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index 9faa678..9cb2015 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -24,6 +24,7 @@
#include <linux/reboot.h>
#include <asm/uaccess.h>

+#include "ocfs2.h" /* For struct ocfs2_lock_res */
#include "stackglue.h"


@@ -152,6 +153,8 @@ union ocfs2_control_message {
struct ocfs2_control_message_down u_down;
};

+static struct ocfs2_stack_plugin user_stack;
+
static atomic_t ocfs2_control_opened;
static int ocfs2_control_this_node = -1;
static struct ocfs2_protocol_version running_proto;
@@ -344,6 +347,20 @@ out_unlock:
return rc;
}

+static int ocfs2_control_get_this_node(void)
+{
+ int rc;
+
+ mutex_lock(&ocfs2_control_lock);
+ if (ocfs2_control_this_node < 0)
+ rc = -EINVAL;
+ else
+ rc = ocfs2_control_this_node;
+ mutex_unlock(&ocfs2_control_lock);
+
+ return rc;
+}
+
static int ocfs2_control_do_setnode_msg(struct file *file,
struct ocfs2_control_message_setn *msg)
{
@@ -652,13 +669,210 @@ static void ocfs2_control_exit(void)
-rc);
}

+static struct dlm_lksb *fsdlm_astarg_to_lksb(void *astarg)
+{
+ struct ocfs2_lock_res *res = astarg;
+ return &res->l_lksb.lksb_fsdlm;
+}
+
+static void fsdlm_lock_ast_wrapper(void *astarg)
+{
+ struct dlm_lksb *lksb = fsdlm_astarg_to_lksb(astarg);
+ int status = lksb->sb_status;
+
+ BUG_ON(user_stack.sp_proto == NULL);
+
+ /*
+ * For now we're punting on the issue of other non-standard errors
+ * where we can't tell if the unlock_ast or lock_ast should be called.
+ * The main "other error" that's possible is EINVAL which means the
+ * function was called with invalid args, which shouldn't be possible
+ * since the caller here is under our control. Other non-standard
+ * errors probably fall into the same category, or otherwise are fatal
+ * which means we can't carry on anyway.
+ */
+
+ if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
+ user_stack.sp_proto->lp_unlock_ast(astarg, 0);
+ else
+ user_stack.sp_proto->lp_lock_ast(astarg);
+}
+
+static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
+{
+ BUG_ON(user_stack.sp_proto == NULL);
+
+ user_stack.sp_proto->lp_blocking_ast(astarg, level);
+}
+
+static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
+ int mode,
+ union ocfs2_dlm_lksb *lksb,
+ u32 flags,
+ void *name,
+ unsigned int namelen,
+ void *astarg)
+{
+ int ret;
+
+ if (!lksb->lksb_fsdlm.sb_lvbptr)
+ lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
+ sizeof(struct dlm_lksb);
+
+ ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
+ flags|DLM_LKF_NODLCKWT, name, namelen, 0,
+ fsdlm_lock_ast_wrapper, astarg,
+ fsdlm_blocking_ast_wrapper);
+ return ret;
+}
+
+static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
+ union ocfs2_dlm_lksb *lksb,
+ u32 flags,
+ void *astarg)
+{
+ int ret;
+
+ ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
+ flags, &lksb->lksb_fsdlm, astarg);
+ return ret;
+}
+
+static int user_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
+{
+ return lksb->lksb_fsdlm.sb_status;
+}
+
+static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb)
+{
+ return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
+}
+
+static void user_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
+{
+}
+
+/*
+ * Compare a requested locking protocol version against the current one.
+ *
+ * If the major numbers are different, they are incompatible.
+ * If the current minor is greater than the request, they are incompatible.
+ * If the current minor is less than or equal to the request, they are
+ * compatible, and the requester should run at the current minor version.
+ */
+static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
+ struct ocfs2_protocol_version *request)
+{
+ if (existing->pv_major != request->pv_major)
+ return 1;
+
+ if (existing->pv_minor > request->pv_minor)
+ return 1;
+
+ if (existing->pv_minor < request->pv_minor)
+ request->pv_minor = existing->pv_minor;
+
+ return 0;
+}
+
+static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
+{
+ dlm_lockspace_t *fsdlm;
+ struct ocfs2_live_connection *control;
+ int rc = 0;
+
+ BUG_ON(conn == NULL);
+
+ rc = ocfs2_live_connection_new(conn, &control);
+ if (rc)
+ goto out;
+
+ /*
+ * running_proto must have been set before we allowed any mounts
+ * to proceed.
+ */
+ if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
+ printk(KERN_ERR
+ "Unable to mount with fs locking protocol version "
+ "%u.%u because the userspace control daemon has "
+ "negotiated %u.%u\n",
+ conn->cc_version.pv_major, conn->cc_version.pv_minor,
+ running_proto.pv_major, running_proto.pv_minor);
+ rc = -EPROTO;
+ ocfs2_live_connection_drop(control);
+ goto out;
+ }
+
+ rc = dlm_new_lockspace(conn->cc_name, strlen(conn->cc_name),
+ &fsdlm, DLM_LSFL_FS, DLM_LVB_LEN);
+ if (rc) {
+ ocfs2_live_connection_drop(control);
+ goto out;
+ }
+
+ conn->cc_private = control;
+ conn->cc_lockspace = fsdlm;
+out:
+ return rc;
+}
+
+static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn,
+ int hangup_pending)
+{
+ dlm_release_lockspace(conn->cc_lockspace, 2);
+ conn->cc_lockspace = NULL;
+ ocfs2_live_connection_drop(conn->cc_private);
+ conn->cc_private = NULL;
+ return 0;
+}
+
+static int user_cluster_this_node(unsigned int *this_node)
+{
+ int rc;
+
+ rc = ocfs2_control_get_this_node();
+ if (rc < 0)
+ return rc;
+
+ *this_node = rc;
+ return 0;
+}
+
+static struct ocfs2_stack_operations user_stack_ops = {
+ .connect = user_cluster_connect,
+ .disconnect = user_cluster_disconnect,
+ .this_node = user_cluster_this_node,
+ .dlm_lock = user_dlm_lock,
+ .dlm_unlock = user_dlm_unlock,
+ .lock_status = user_dlm_lock_status,
+ .lock_lvb = user_dlm_lvb,
+ .dump_lksb = user_dlm_dump_lksb,
+};
+
+static struct ocfs2_stack_plugin user_stack = {
+ .sp_name = "user",
+ .sp_ops = &user_stack_ops,
+ .sp_owner = THIS_MODULE,
+};
+
+
static int __init user_stack_init(void)
{
- return ocfs2_control_init();
+ int rc;
+
+ rc = ocfs2_control_init();
+ if (!rc) {
+ rc = ocfs2_stack_glue_register(&user_stack);
+ if (rc)
+ ocfs2_control_exit();
+ }
+
+ return rc;
}

static void __exit user_stack_exit(void)
{
+ ocfs2_stack_glue_unregister(&user_stack);
ocfs2_control_exit();
}

diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index bf45d9b..119f60c 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -228,13 +228,20 @@ void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto)
EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_locking_protocol);


+/*
+ * The ocfs2_dlm_lock() and ocfs2_dlm_unlock() functions take
+ * "struct ocfs2_lock_res *astarg" instead of "void *astarg" because the
+ * underlying stack plugins need to pilfer the lksb off of the lock_res.
+ * If some other structure needs to be passed as an astarg, the plugins
+ * will need to be given a different avenue to the lksb.
+ */
int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen,
- void *astarg)
+ struct ocfs2_lock_res *astarg)
{
BUG_ON(lproto == NULL);

@@ -246,7 +253,7 @@ EXPORT_SYMBOL_GPL(ocfs2_dlm_lock);
int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
- void *astarg)
+ struct ocfs2_lock_res *astarg)
{
BUG_ON(lproto == NULL);

@@ -360,7 +367,8 @@ void ocfs2_cluster_hangup(const char *group, int grouplen)
BUG_ON(group == NULL);
BUG_ON(group[grouplen] != '\0');

- active_stack->sp_ops->hangup(group, grouplen);
+ if (active_stack->sp_ops->hangup)
+ active_stack->sp_ops->hangup(group, grouplen);

/* cluster_disconnect() was called with hangup_pending==1 */
ocfs2_stack_driver_put();
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index d88bc65..005e4f1 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -26,6 +26,7 @@
#include <linux/dlmconstants.h>

#include "dlm/dlmapi.h"
+#include <linux/dlm.h>

/*
* dlmconstants.h does not have a LOCAL flag. We hope to remove it
@@ -60,6 +61,17 @@ struct ocfs2_locking_protocol {
void (*lp_unlock_ast)(void *astarg, int error);
};

+
+/*
+ * The dlm_lockstatus struct includes lvb space, but the dlm_lksb struct only
+ * has a pointer to separately allocated lvb space. This struct exists only to
+ * include in the lksb union to make space for a combined dlm_lksb and lvb.
+ */
+struct fsdlm_lksb_plus_lvb {
+ struct dlm_lksb lksb;
+ char lvb[DLM_LVB_LEN];
+};
+
/*
* A union of all lock status structures. We define it here so that the
* size of the union is known. Lock status structures are embedded in
@@ -67,6 +79,8 @@ struct ocfs2_locking_protocol {
*/
union ocfs2_dlm_lksb {
struct dlm_lockstatus lksb_o2dlm;
+ struct dlm_lksb lksb_fsdlm;
+ struct fsdlm_lksb_plus_lvb padding;
};

/*
@@ -221,17 +235,18 @@ int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn,
void ocfs2_cluster_hangup(const char *group, int grouplen);
int ocfs2_cluster_this_node(unsigned int *node);

+struct ocfs2_lock_res;
int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen,
- void *astarg);
+ struct ocfs2_lock_res *astarg);
int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
- void *astarg);
+ struct ocfs2_lock_res *astarg);

int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb);
void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb);
--
1.5.3.8