2008-03-06 00:28:43

by Joel Becker

[permalink] [raw]
Subject: [PATCH 0/18] ocfs2: Cluster stack glue layer

ocfs2 is currently tied to its internal cluster stack, the ocfs2 Cluster
Base (o2cb). This includes the nodemanager (o2nm) and distributed lock
manager (o2dlm).

Going forward, ocfs2 would like to use the DLM in fs/dlm. This includes
interacting with userspace cluster stacks that drive fs/dlm, allowing
all clustering on a machine to use the same stack.

This patch series decouples the o2cb stack from ocfs2, creating a
plug-in architecture. A layer called "stackglue" sits between the ocfs2
filesystem and the chosen cluster plug-in. A later patch series will
introduce the plug-in for fs/dlm and userspace cluster stacks.

The series should be functionally equivalent. Each patch should compile
and run successfully, with no modification to userspace tools. The end
result of this series is a plug-in cluster backend that behaves exactly
as the current system does. As an added benefit, local (non-clustered)
ocfs2 filesystems do not need to load any plug-in. In the past, even a
non-clustered filesystem required the o2cb modules loaded, though not
configured.

The kernel code is also available on the 'stack-glue' branch of my git
repository.

View:
http://oss.oracle.com/git/?p=jlbec/linux-2.6.git;a=shortlog;h=stack-glue
Pull:
git pull git://oss.oracle.com/git/jlbec/linux-2.6.git stack-glue


2008-03-06 00:29:38

by Joel Becker

[permalink] [raw]
Subject: [PATCH 17/18] ocfs2: Add the USERSPACE_STACK incompat bit.

The filesystem gains the USERSPACE_STACK incomat bit and the
s_cluster_info field on the superblock. When a userspace stack is in
use, the name of the stack is stored on-disk for mount-time
verification.

The "cluster_stack" option is added to mount(2) processing. The mount
process needs to pass the matching stack name. If the passed name and
the on-disk name do not match, the mount is failed.

When using the classic o2cb stack, the incompat bit is *not* set and no
mount option is used other than the usual heartbeat=local. Thus, the
filesystem is compatible with older tools.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/ocfs2.h | 7 ++++
fs/ocfs2/ocfs2_fs.h | 40 +++++++++++++++++++++-
fs/ocfs2/super.c | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++-
3 files changed, 134 insertions(+), 3 deletions(-)

diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index af929ec..9ff5811 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -248,6 +248,7 @@ struct ocfs2_super
struct ocfs2_alloc_stats alloc_stats;
char dev_str[20]; /* "major,minor" of the device */

+ char osb_cluster_stack[OCFS2_STACK_LABEL_LEN + 1];
struct ocfs2_cluster_connection *cconn;
struct ocfs2_lock_res osb_super_lockres;
struct ocfs2_lock_res osb_rename_lockres;
@@ -368,6 +369,12 @@ static inline int ocfs2_is_soft_readonly(struct ocfs2_super *osb)
return ret;
}

+static inline int ocfs2_userspace_stack(struct ocfs2_super *osb)
+{
+ return (osb->s_feature_incompat &
+ OCFS2_FEATURE_INCOMPAT_USERSPACE_STACK);
+}
+
static inline int ocfs2_mount_local(struct ocfs2_super *osb)
{
return (osb->s_feature_incompat & OCFS2_FEATURE_INCOMPAT_LOCAL_MOUNT);
diff --git a/fs/ocfs2/ocfs2_fs.h b/fs/ocfs2/ocfs2_fs.h
index c495023..52c4266 100644
--- a/fs/ocfs2/ocfs2_fs.h
+++ b/fs/ocfs2/ocfs2_fs.h
@@ -89,7 +89,8 @@
#define OCFS2_FEATURE_INCOMPAT_SUPP (OCFS2_FEATURE_INCOMPAT_LOCAL_MOUNT \
| OCFS2_FEATURE_INCOMPAT_SPARSE_ALLOC \
| OCFS2_FEATURE_INCOMPAT_INLINE_DATA \
- | OCFS2_FEATURE_INCOMPAT_EXTENDED_SLOT_MAP)
+ | OCFS2_FEATURE_INCOMPAT_EXTENDED_SLOT_MAP \
+ | OCFS2_FEATURE_INCOMPAT_USERSPACE_STACK)
#define OCFS2_FEATURE_RO_COMPAT_SUPP OCFS2_FEATURE_RO_COMPAT_UNWRITTEN

/*
@@ -131,6 +132,17 @@


/*
+ * Support for alternate, userspace cluster stacks. If set, the superblock
+ * field s_cluster_info contains a tag for the alternate stack in use as
+ * well as the name of the cluster being joined.
+ * mount.ocfs2 must pass in a matching stack name.
+ *
+ * If not set, the classic stack will be used. This is compatbile with
+ * all older versions.
+ */
+#define OCFS2_FEATURE_INCOMPAT_USERSPACE_STACK 0x0080
+
+/*
* backup superblock flag is used to indicate that this volume
* has backup superblocks.
*/
@@ -272,6 +284,10 @@ struct ocfs2_new_group_input {
#define OCFS2_VOL_UUID_LEN 16
#define OCFS2_MAX_VOL_LABEL_LEN 64

+/* The alternate, userspace stack fields */
+#define OCFS2_STACK_LABEL_LEN 4
+#define OCFS2_CLUSTER_NAME_LEN 16
+
/* Journal limits (in bytes) */
#define OCFS2_MIN_JOURNAL_SIZE (4 * 1024 * 1024)

@@ -513,6 +529,13 @@ struct ocfs2_slot_map_extended {
*/
};

+struct ocfs2_cluster_info {
+/*00*/ __u8 ci_stack[OCFS2_STACK_LABEL_LEN];
+ __le32 ci_reserved;
+/*08*/ __u8 ci_cluster[OCFS2_CLUSTER_NAME_LEN];
+/*18*/
+};
+
/*
* On disk superblock for OCFS2
* Note that it is contained inside an ocfs2_dinode, so all offsets
@@ -545,7 +568,20 @@ struct ocfs2_super_block {
* group header */
/*50*/ __u8 s_label[OCFS2_MAX_VOL_LABEL_LEN]; /* Label for mounting, etc. */
/*90*/ __u8 s_uuid[OCFS2_VOL_UUID_LEN]; /* 128-bit uuid */
-/*A0*/
+/*A0*/ struct ocfs2_cluster_info s_cluster_info; /* Selected userspace
+ stack. Only valid
+ with INCOMPAT flag. */
+/*B8*/ __le64 s_reserved2[17]; /* Fill out superblock */
+/*140*/
+
+ /*
+ * NOTE: As stated above, all offsets are relative to
+ * ocfs2_dinode.id2, which is at 0xC0 in the inode.
+ * 0xC0 + 0x140 = 0x200 or 512 bytes. A superblock must fit within
+ * our smallest blocksize, which is 512 bytes. To ensure this,
+ * we reserve the space in s_reserved2. Anything past s_reserved2
+ * will not be available on the smallest blocksize.
+ */
};

/*
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index e27a0d4..96ebe36 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -87,6 +87,7 @@ struct mount_options
unsigned int atime_quantum;
signed short slot;
unsigned int localalloc_opt;
+ char cluster_stack[OCFS2_STACK_LABEL_LEN + 1];
};

static int ocfs2_parse_options(struct super_block *sb, char *options,
@@ -152,6 +153,7 @@ enum {
Opt_commit,
Opt_localalloc,
Opt_localflocks,
+ Opt_stack,
Opt_err,
};

@@ -170,6 +172,7 @@ static match_table_t tokens = {
{Opt_commit, "commit=%u"},
{Opt_localalloc, "localalloc=%d"},
{Opt_localflocks, "localflocks"},
+ {Opt_stack, "cluster_stack=%s"},
{Opt_err, NULL}
};

@@ -549,8 +552,17 @@ static int ocfs2_verify_heartbeat(struct ocfs2_super *osb)
}
}

+ if (ocfs2_userspace_stack(osb)) {
+ if (osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL) {
+ mlog(ML_ERROR, "Userspace stack expected, but "
+ "o2cb heartbeat arguments passed to mount\n");
+ return -EINVAL;
+ }
+ }
+
if (!(osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL)) {
- if (!ocfs2_mount_local(osb) && !ocfs2_is_hard_readonly(osb)) {
+ if (!ocfs2_mount_local(osb) && !ocfs2_is_hard_readonly(osb) &&
+ !ocfs2_userspace_stack(osb)) {
mlog(ML_ERROR, "Heartbeat has to be started to mount "
"a read-write clustered device.\n");
return -EINVAL;
@@ -560,6 +572,35 @@ static int ocfs2_verify_heartbeat(struct ocfs2_super *osb)
return 0;
}

+/*
+ * If we're using a userspace stack, mount should have passed
+ * a name that matches the disk. If not, mount should not
+ * have passed a stack.
+ */
+static int ocfs2_verify_userspace_stack(struct ocfs2_super *osb,
+ struct mount_options *mopt)
+{
+ if (!ocfs2_userspace_stack(osb) && mopt->cluster_stack[0]) {
+ mlog(ML_ERROR,
+ "cluster stack passed to mount, but this filesystem "
+ "does not support it\n");
+ return -EINVAL;
+ }
+
+ if (ocfs2_userspace_stack(osb) &&
+ strncmp(osb->osb_cluster_stack, mopt->cluster_stack,
+ OCFS2_STACK_LABEL_LEN)) {
+ mlog(ML_ERROR,
+ "cluster stack passed to mount (\"%s\") does not "
+ "match the filesystem (\"%s\")\n",
+ mopt->cluster_stack,
+ osb->osb_cluster_stack);
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
{
struct dentry *root;
@@ -598,6 +639,10 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
osb->osb_commit_interval = parsed_options.commit_interval;
osb->local_alloc_size = parsed_options.localalloc_opt;

+ status = ocfs2_verify_userspace_stack(osb, &parsed_options);
+ if (status)
+ goto read_super_error;
+
sb->s_magic = OCFS2_SUPER_MAGIC;

/* Hard readonly mode only if: bdev_read_only, MS_RDONLY,
@@ -752,6 +797,7 @@ static int ocfs2_parse_options(struct super_block *sb,
mopt->atime_quantum = OCFS2_DEFAULT_ATIME_QUANTUM;
mopt->slot = OCFS2_INVALID_SLOT;
mopt->localalloc_opt = OCFS2_DEFAULT_LOCAL_ALLOC_SIZE;
+ mopt->cluster_stack[0] = '\0';

if (!options) {
status = 1;
@@ -853,6 +899,25 @@ static int ocfs2_parse_options(struct super_block *sb,
if (!is_remount)
mopt->mount_opt |= OCFS2_MOUNT_LOCALFLOCKS;
break;
+ case Opt_stack:
+ /* Check both that the option we were passed
+ * is of the right length and that it is a proper
+ * string of the right length.
+ */
+ if (((args[0].to - args[0].from) !=
+ OCFS2_STACK_LABEL_LEN) ||
+ (strnlen(args[0].from,
+ OCFS2_STACK_LABEL_LEN) !=
+ OCFS2_STACK_LABEL_LEN)) {
+ mlog(ML_ERROR,
+ "Invalid cluster_stack option\n");
+ status = 0;
+ goto bail;
+ }
+ memcpy(mopt->cluster_stack, args[0].from,
+ OCFS2_STACK_LABEL_LEN);
+ mopt->cluster_stack[OCFS2_STACK_LABEL_LEN] = '\0';
+ break;
default:
mlog(ML_ERROR,
"Unrecognized mount option \"%s\" "
@@ -911,6 +976,10 @@ static int ocfs2_show_options(struct seq_file *s, struct vfsmount *mnt)
if (opts & OCFS2_MOUNT_LOCALFLOCKS)
seq_printf(s, ",localflocks,");

+ if (osb->osb_cluster_stack[0])
+ seq_printf(s, ",cluster_stack=%.*s", OCFS2_STACK_LABEL_LEN,
+ osb->osb_cluster_stack);
+
return 0;
}

@@ -1403,6 +1472,25 @@ static int ocfs2_initialize_super(struct super_block *sb,
goto bail;
}

+ if (ocfs2_userspace_stack(osb)) {
+ memcpy(osb->osb_cluster_stack,
+ OCFS2_RAW_SB(di)->s_cluster_info.ci_stack,
+ OCFS2_STACK_LABEL_LEN);
+ osb->osb_cluster_stack[OCFS2_STACK_LABEL_LEN] = '\0';
+ if (strlen(osb->osb_cluster_stack) != OCFS2_STACK_LABEL_LEN) {
+ mlog(ML_ERROR,
+ "couldn't mount because of an invalid "
+ "cluster stack label (%s) \n",
+ osb->osb_cluster_stack);
+ status = -EINVAL;
+ goto bail;
+ }
+ } else {
+ /* The empty string is identical with classic tools that
+ * don't know about s_cluster_info. */
+ osb->osb_cluster_stack[0] = '\0';
+ }
+
get_random_bytes(&osb->s_next_generation, sizeof(u32));

/* FIXME
--
1.5.3.8

2008-03-06 00:30:09

by Joel Becker

[permalink] [raw]
Subject: [PATCH 11/18] ocfs2: Abstract out a debugging function for underlying dlms.

dlmglue.c was still referencing a raw o2dlm lksb in one instance. Let's
create a generic ocfs2_dlm_dump_lksb() function. This allows underlying
DLMs to print whatever they want about their lock.

We then move the o2dlm dump into stackglue.c where it belongs.

Signed-off-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/dlmglue.c | 3 +--
fs/ocfs2/stackglue.c | 5 +++++
fs/ocfs2/stackglue.h | 1 +
3 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index d48163f..58a34df 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -2803,8 +2803,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
- /* XXX Need to abstract this */
- dlm_print_one_lock(lockres->l_lksb.lksb_o2dlm.lockid);
+ ocfs2_dlm_dump_lksb(&lockres->l_lksb);
BUG();
}
mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n",
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index abdb9f6..bd80541 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -252,6 +252,11 @@ void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb)
return (void *)(lksb->lksb_o2dlm.lvb);
}

+void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
+{
+ dlm_print_one_lock(lksb->lksb_o2dlm.lockid);
+}
+
/*
* Called from the dlm when it's about to evict a node. This is how the
* classic stack signals node death.
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index 22af77b..01e3c9b 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -91,6 +91,7 @@ int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,

int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb);
void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb);
+void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb);

void o2cb_get_stack(struct ocfs2_locking_protocol *proto);
void o2cb_put_stack(void);
--
1.5.3.8

2008-03-06 00:30:55

by Joel Becker

[permalink] [raw]
Subject: [PATCH 13/18] ocfs2: Split o2cb code from generic stack functions.

Split off the o2cb-specific funtionality from the generic stack glue
calls. This is a precurser to wrapping the o2cb functionality in an
operations vector.

Signed-off-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/stackglue.c | 209 ++++++++++++++++++++++++++++++++++----------------
1 files changed, 144 insertions(+), 65 deletions(-)

diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 51c2546..e35dde6 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -197,21 +197,19 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
lproto->lp_unlock_ast(astarg, error);
}

-int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
- int mode,
- union ocfs2_dlm_lksb *lksb,
- u32 flags,
- void *name,
- unsigned int namelen,
- void *astarg)
+static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
+ int mode,
+ union ocfs2_dlm_lksb *lksb,
+ u32 flags,
+ void *name,
+ unsigned int namelen,
+ void *astarg)
{
enum dlm_status status;
int o2dlm_mode = mode_to_o2dlm(mode);
int o2dlm_flags = flags_to_o2dlm(flags);
int ret;

- BUG_ON(lproto == NULL);
-
status = dlmlock(conn->cc_lockspace, o2dlm_mode, &lksb->lksb_o2dlm,
o2dlm_flags, name, namelen,
o2dlm_lock_ast_wrapper, astarg,
@@ -220,43 +218,80 @@ int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
return ret;
}

-int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
- union ocfs2_dlm_lksb *lksb,
- u32 flags,
- void *astarg)
+int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
+ int mode,
+ union ocfs2_dlm_lksb *lksb,
+ u32 flags,
+ void *name,
+ unsigned int namelen,
+ void *astarg)
+{
+ BUG_ON(lproto == NULL);
+
+ return o2cb_dlm_lock(conn, mode, lksb, flags,
+ name, namelen, astarg);
+}
+
+static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn,
+ union ocfs2_dlm_lksb *lksb,
+ u32 flags,
+ void *astarg)
{
enum dlm_status status;
int o2dlm_flags = flags_to_o2dlm(flags);
int ret;

- BUG_ON(lproto == NULL);
-
status = dlmunlock(conn->cc_lockspace, &lksb->lksb_o2dlm,
o2dlm_flags, o2dlm_unlock_ast_wrapper, astarg);
ret = dlm_status_to_errno(status);
return ret;
}

-int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
+int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
+ union ocfs2_dlm_lksb *lksb,
+ u32 flags,
+ void *astarg)
+{
+ BUG_ON(lproto == NULL);
+
+ return o2cb_dlm_unlock(conn, lksb, flags, astarg);
+}
+
+static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
{
return dlm_status_to_errno(lksb->lksb_o2dlm.status);
}

+int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
+{
+ return o2cb_dlm_lock_status(lksb);
+}
+
/*
* Why don't we cast to ocfs2_meta_lvb? The "clean" answer is that we
* don't cast at the glue level. The real answer is that the header
* ordering is nigh impossible.
*/
-void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb)
+static void *o2cb_dlm_lvb(union ocfs2_dlm_lksb *lksb)
{
return (void *)(lksb->lksb_o2dlm.lvb);
}

-void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
+void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb)
+{
+ return o2cb_dlm_lvb(lksb);
+}
+
+static void o2cb_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
{
dlm_print_one_lock(lksb->lksb_o2dlm.lockid);
}

+void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
+{
+ o2cb_dlm_dump_lksb(lksb);
+}
+
/*
* Called from the dlm when it's about to evict a node. This is how the
* classic stack signals node death.
@@ -271,6 +306,62 @@ static void o2dlm_eviction_cb(int node_num, void *data)
conn->cc_recovery_handler(node_num, conn->cc_recovery_data);
}

+static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn)
+{
+ int rc = 0;
+ u32 dlm_key;
+ struct dlm_ctxt *dlm;
+ struct o2dlm_private *priv;
+ struct dlm_protocol_version dlm_version;
+
+ BUG_ON(conn == NULL);
+
+ /* for now we only have one cluster/node, make sure we see it
+ * in the heartbeat universe */
+ if (!o2hb_check_local_node_heartbeating()) {
+ rc = -EINVAL;
+ goto out;
+ }
+
+ priv = kzalloc(sizeof(struct o2dlm_private), GFP_KERNEL);
+ if (!priv) {
+ rc = -ENOMEM;
+ goto out_free;
+ }
+
+ /* This just fills the structure in. It is safe to pass conn. */
+ dlm_setup_eviction_cb(&priv->op_eviction_cb, o2dlm_eviction_cb,
+ conn);
+
+ conn->cc_private = priv;
+
+ /* used by the dlm code to make message headers unique, each
+ * node in this domain must agree on this. */
+ dlm_key = crc32_le(0, conn->cc_name, conn->cc_namelen);
+ dlm_version.pv_major = conn->cc_version.pv_major;
+ dlm_version.pv_minor = conn->cc_version.pv_minor;
+
+ dlm = dlm_register_domain(conn->cc_name, dlm_key, &dlm_version);
+ if (IS_ERR(dlm)) {
+ rc = PTR_ERR(dlm);
+ mlog_errno(rc);
+ goto out_free;
+ }
+
+ conn->cc_version.pv_major = dlm_version.pv_major;
+ conn->cc_version.pv_minor = dlm_version.pv_minor;
+ conn->cc_lockspace = dlm;
+
+ dlm_register_eviction_cb(dlm, &priv->op_eviction_cb);
+
+out_free:
+ if (rc && conn->cc_private)
+ kfree(conn->cc_private);
+
+out:
+ return rc;
+}
+
int ocfs2_cluster_connect(const char *group,
int grouplen,
void (*recovery_handler)(int node_num,
@@ -280,10 +371,6 @@ int ocfs2_cluster_connect(const char *group,
{
int rc = 0;
struct ocfs2_cluster_connection *new_conn;
- u32 dlm_key;
- struct dlm_ctxt *dlm;
- struct o2dlm_private *priv;
- struct dlm_protocol_version dlm_version;

BUG_ON(group == NULL);
BUG_ON(conn == NULL);
@@ -294,13 +381,6 @@ int ocfs2_cluster_connect(const char *group,
goto out;
}

- /* for now we only have one cluster/node, make sure we see it
- * in the heartbeat universe */
- if (!o2hb_check_local_node_heartbeating()) {
- rc = -EINVAL;
- goto out;
- }
-
new_conn = kzalloc(sizeof(struct ocfs2_cluster_connection),
GFP_KERNEL);
if (!new_conn) {
@@ -316,64 +396,53 @@ int ocfs2_cluster_connect(const char *group,
/* Start the new connection at our maximum compatibility level */
new_conn->cc_version = lproto->lp_max_version;

- priv = kzalloc(sizeof(struct o2dlm_private), GFP_KERNEL);
- if (!priv) {
- rc = -ENOMEM;
- goto out_free;
- }
-
- /* This just fills the structure in. It is safe to use new_conn. */
- dlm_setup_eviction_cb(&priv->op_eviction_cb, o2dlm_eviction_cb,
- new_conn);
-
- new_conn->cc_private = priv;
-
- /* used by the dlm code to make message headers unique, each
- * node in this domain must agree on this. */
- dlm_key = crc32_le(0, group, grouplen);
- dlm_version.pv_major = new_conn->cc_version.pv_major;
- dlm_version.pv_minor = new_conn->cc_version.pv_minor;
-
- dlm = dlm_register_domain(group, dlm_key, &dlm_version);
- if (IS_ERR(dlm)) {
- rc = PTR_ERR(dlm);
+ rc = o2cb_cluster_connect(new_conn);
+ if (rc) {
mlog_errno(rc);
goto out_free;
}

- new_conn->cc_version.pv_major = dlm_version.pv_major;
- new_conn->cc_version.pv_minor = dlm_version.pv_minor;
- new_conn->cc_lockspace = dlm;
-
- dlm_register_eviction_cb(dlm, &priv->op_eviction_cb);
-
*conn = new_conn;

out_free:
- if (rc) {
- kfree(new_conn->cc_private);
+ if (rc)
kfree(new_conn);
- }

out:
return rc;
}


-int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn)
+static int o2cb_cluster_disconnect(struct ocfs2_cluster_connection *conn)
{
struct dlm_ctxt *dlm = conn->cc_lockspace;
struct o2dlm_private *priv = conn->cc_private;

dlm_unregister_eviction_cb(&priv->op_eviction_cb);
- dlm_unregister_domain(dlm);
-
+ conn->cc_private = NULL;
kfree(priv);
- kfree(conn);
+
+ dlm_unregister_domain(dlm);
+ conn->cc_lockspace = NULL;

return 0;
}

+int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn)
+{
+ int ret;
+
+ BUG_ON(conn == NULL);
+
+ ret = o2cb_cluster_disconnect(conn);
+
+ /* XXX Should we free it anyway? */
+ if (!ret)
+ kfree(conn);
+
+ return ret;
+}
+
static void o2hb_stop(const char *group)
{
int ret;
@@ -406,15 +475,20 @@ static void o2hb_stop(const char *group)
*
* Other stacks will eventually provide a NULL ->hangup() pointer.
*/
+static void o2cb_cluster_hangup(const char *group, int grouplen)
+{
+ o2hb_stop(group);
+}
+
void ocfs2_cluster_hangup(const char *group, int grouplen)
{
BUG_ON(group == NULL);
BUG_ON(group[grouplen] != '\0');

- o2hb_stop(group);
+ o2cb_cluster_hangup(group, grouplen);
}

-int ocfs2_cluster_this_node(unsigned int *node)
+static int o2cb_cluster_this_node(unsigned int *node)
{
int node_num;

@@ -429,6 +503,11 @@ int ocfs2_cluster_this_node(unsigned int *node)
return 0;
}

+int ocfs2_cluster_this_node(unsigned int *node)
+{
+ return o2cb_cluster_this_node(node);
+}
+
void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto)
{
BUG_ON(proto != NULL);
--
1.5.3.8

2008-03-06 00:31:32

by Joel Becker

[permalink] [raw]
Subject: [PATCH 05/18] ocfs2: Introduce the new ocfs2_cluster_connect/disconnect() API.

This step introduces a cluster stack agnostic API for initializing and
exiting. fs/ocfs2/dlmglue.c no longer uses o2cb/o2dlm knowledge to
connect to the stack. It is all handled in stackglue.c.

heartbeat.c no longer needs to know how it gets called.
ocfs2_do_node_down() is now a clean recovery trigger.

The big gotcha is the ordering of initializations and de-initializations done
underneath ocfs2_cluster_connect(). ocfs2_dlm_init() used to do all
o2dlm initialization in one block. Thus, the o2dlm functionality of
ocfs2_cluster_connect() is very straightforward. ocfs2_dlm_shutdown(),
however, did a few things between de-registration of the eviction
callback and actually shutting down the domain. Now de-registration and
shutdown of the domain are wrapped within the single
ocfs2_cluster_disconnect() call. I've checked the code paths to make
sure we can safely tear down things in ocfs2_dlm_shutdown() before
calling ocfs2_cluster_disconnect(). The filesystem has already set
itself to ignore the callback.

Signed-off-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/dlmglue.c | 97 ++++++++++++++++++-------------------
fs/ocfs2/dlmglue.h | 1 -
fs/ocfs2/heartbeat.c | 40 +++------------
fs/ocfs2/heartbeat.h | 2 +-
fs/ocfs2/ocfs2.h | 4 +-
fs/ocfs2/stackglue.c | 131 +++++++++++++++++++++++++++++++++++++++++++++++--
fs/ocfs2/stackglue.h | 35 +++++++++++++-
fs/ocfs2/super.c | 13 ++---
8 files changed, 221 insertions(+), 102 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 0053945..9e13d4b 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -27,7 +27,6 @@
#include <linux/slab.h>
#include <linux/highmem.h>
#include <linux/mm.h>
-#include <linux/crc32.h>
#include <linux/kthread.h>
#include <linux/pagemap.h>
#include <linux/debugfs.h>
@@ -259,31 +258,6 @@ static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
.flags = 0,
};

-/*
- * This is the filesystem locking protocol version.
- *
- * Whenever the filesystem does new things with locks (adds or removes a
- * lock, orders them differently, does different things underneath a lock),
- * the version must be changed. The protocol is negotiated when joining
- * the dlm domain. A node may join the domain if its major version is
- * identical to all other nodes and its minor version is greater than
- * or equal to all other nodes. When its minor version is greater than
- * the other nodes, it will run at the minor version specified by the
- * other nodes.
- *
- * If a locking change is made that will not be compatible with older
- * versions, the major number must be increased and the minor version set
- * to zero. If a change merely adds a behavior that can be disabled when
- * speaking to older versions, the minor version must be increased. If a
- * change adds a fully backwards compatible change (eg, LVB changes that
- * are just ignored by older versions), the version does not need to be
- * updated.
- */
-const struct dlm_protocol_version ocfs2_locking_protocol = {
- .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
- .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
-};
-
static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
{
return lockres->l_type == OCFS2_LOCK_TYPE_META ||
@@ -886,7 +860,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
spin_unlock_irqrestore(&lockres->l_lock, flags);

- ret = ocfs2_dlm_lock(osb->dlm,
+ ret = ocfs2_dlm_lock(osb->cconn,
level,
&lockres->l_lksb,
dlm_flags,
@@ -1085,7 +1059,7 @@ again:
lockres->l_name, lockres->l_level, level);

/* call dlm_lock to upgrade lock now */
- ret = ocfs2_dlm_lock(osb->dlm,
+ ret = ocfs2_dlm_lock(osb->cconn,
level,
&lockres->l_lksb,
lkm_flags,
@@ -1492,7 +1466,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
spin_unlock_irqrestore(&lockres->l_lock, flags);

- ret = ocfs2_dlm_lock(osb->dlm, level, &lockres->l_lksb, lkm_flags,
+ ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
if (ret) {
@@ -2485,8 +2459,7 @@ static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
int ocfs2_dlm_init(struct ocfs2_super *osb)
{
int status = 0;
- u32 dlm_key;
- struct dlm_ctxt *dlm = NULL;
+ struct ocfs2_cluster_connection *conn = NULL;

mlog_entry_void();

@@ -2508,26 +2481,21 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
goto bail;
}

- /* used by the dlm code to make message headers unique, each
- * node in this domain must agree on this. */
- dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
-
/* for now, uuid == domain */
- dlm = dlm_register_domain(osb->uuid_str, dlm_key,
- &osb->osb_locking_proto);
- if (IS_ERR(dlm)) {
- status = PTR_ERR(dlm);
+ status = ocfs2_cluster_connect(osb->uuid_str,
+ strlen(osb->uuid_str),
+ ocfs2_do_node_down, osb,
+ &conn);
+ if (status) {
mlog_errno(status);
goto bail;
}

- dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
-
local:
ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);

- osb->dlm = dlm;
+ osb->cconn = conn;

status = 0;
bail:
@@ -2545,10 +2513,14 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
{
mlog_entry_void();

- dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
-
ocfs2_drop_osb_locks(osb);

+ /*
+ * Now that we have dropped all locks and ocfs2_dismount_volume()
+ * has disabled recovery, the DLM won't be talking to us. It's
+ * safe to tear things down before disconnecting the cluster.
+ */
+
if (osb->dc_task) {
kthread_stop(osb->dc_task);
osb->dc_task = NULL;
@@ -2557,8 +2529,8 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
ocfs2_lock_res_free(&osb->osb_super_lockres);
ocfs2_lock_res_free(&osb->osb_rename_lockres);

- dlm_unregister_domain(osb->dlm);
- osb->dlm = NULL;
+ ocfs2_cluster_disconnect(osb->cconn);
+ osb->cconn = NULL;

ocfs2_dlm_shutdown_debug(osb);

@@ -2689,7 +2661,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,

mlog(0, "lock %s\n", lockres->l_name);

- ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb, lkm_flags,
+ ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
lockres);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
@@ -2823,7 +2795,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
if (lvb)
dlm_flags |= DLM_LKF_VALBLK;

- ret = ocfs2_dlm_lock(osb->dlm,
+ ret = ocfs2_dlm_lock(osb->cconn,
new_level,
&lockres->l_lksb,
dlm_flags,
@@ -2882,7 +2854,7 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
mlog_entry_void();
mlog(0, "lock %s\n", lockres->l_name);

- ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb,
+ ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
DLM_LKF_CANCEL, lockres);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
@@ -3193,7 +3165,34 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
return UNBLOCK_CONTINUE_POST;
}

+/*
+ * This is the filesystem locking protocol. It provides the lock handling
+ * hooks for the underlying DLM. It has a maximum version number.
+ * The version number allows interoperability with systems running at
+ * the same major number and an equal or smaller minor number.
+ *
+ * Whenever the filesystem does new things with locks (adds or removes a
+ * lock, orders them differently, does different things underneath a lock),
+ * the version must be changed. The protocol is negotiated when joining
+ * the dlm domain. A node may join the domain if its major version is
+ * identical to all other nodes and its minor version is greater than
+ * or equal to all other nodes. When its minor version is greater than
+ * the other nodes, it will run at the minor version specified by the
+ * other nodes.
+ *
+ * If a locking change is made that will not be compatible with older
+ * versions, the major number must be increased and the minor version set
+ * to zero. If a change merely adds a behavior that can be disabled when
+ * speaking to older versions, the minor version must be increased. If a
+ * change adds a fully backwards compatible change (eg, LVB changes that
+ * are just ignored by older versions), the version does not need to be
+ * updated.
+ */
static struct ocfs2_locking_protocol lproto = {
+ .lp_max_version = {
+ .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
+ .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
+ },
.lp_lock_ast = ocfs2_locking_ast,
.lp_blocking_ast = ocfs2_blocking_ast,
.lp_unlock_ast = ocfs2_unlock_ast,
diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h
index 3238043..2d0a8a0 100644
--- a/fs/ocfs2/dlmglue.h
+++ b/fs/ocfs2/dlmglue.h
@@ -117,5 +117,4 @@ void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug);
void dlmglue_init_stack(void);
void dlmglue_exit_stack(void);

-extern const struct dlm_protocol_version ocfs2_locking_protocol;
#endif /* DLMGLUE_H */
diff --git a/fs/ocfs2/heartbeat.c b/fs/ocfs2/heartbeat.c
index 80de239..dcac1a4 100644
--- a/fs/ocfs2/heartbeat.c
+++ b/fs/ocfs2/heartbeat.c
@@ -30,8 +30,6 @@
#include <linux/highmem.h>
#include <linux/kmod.h>

-#include <dlm/dlmapi.h>
-
#define MLOG_MASK_PREFIX ML_SUPER
#include <cluster/masklog.h>

@@ -64,19 +62,20 @@ void ocfs2_init_node_maps(struct ocfs2_super *osb)
ocfs2_node_map_init(&osb->osb_recovering_orphan_dirs);
}

-static void ocfs2_do_node_down(int node_num,
- struct ocfs2_super *osb)
+void ocfs2_do_node_down(int node_num, void *data)
{
+ struct ocfs2_super *osb = data;
+
BUG_ON(osb->node_num == node_num);

mlog(0, "ocfs2: node down event for %d\n", node_num);

- if (!osb->dlm) {
+ if (!osb->cconn) {
/*
- * No DLM means we're not even ready to participate yet.
- * We check the slots after the DLM comes up, so we will
- * notice the node death then. We can safely ignore it
- * here.
+ * No cluster connection means we're not even ready to
+ * participate yet. We check the slots after the cluster
+ * comes up, so we will notice the node death then. We
+ * can safely ignore it here.
*/
return;
}
@@ -84,29 +83,6 @@ static void ocfs2_do_node_down(int node_num,
ocfs2_recovery_thread(osb, node_num);
}

-/* Called from the dlm when it's about to evict a node. We may also
- * get a heartbeat callback later. */
-static void ocfs2_dlm_eviction_cb(int node_num,
- void *data)
-{
- struct ocfs2_super *osb = (struct ocfs2_super *) data;
- struct super_block *sb = osb->sb;
-
- mlog(ML_NOTICE, "device (%u,%u): dlm has evicted node %d\n",
- MAJOR(sb->s_dev), MINOR(sb->s_dev), node_num);
-
- ocfs2_do_node_down(node_num, osb);
-}
-
-void ocfs2_setup_hb_callbacks(struct ocfs2_super *osb)
-{
- /* Not exactly a heartbeat callback, but leads to essentially
- * the same path so we set it up here. */
- dlm_setup_eviction_cb(&osb->osb_eviction_cb,
- ocfs2_dlm_eviction_cb,
- osb);
-}
-
void ocfs2_stop_heartbeat(struct ocfs2_super *osb)
{
int ret;
diff --git a/fs/ocfs2/heartbeat.h b/fs/ocfs2/heartbeat.h
index 98d8ffc..38e2450 100644
--- a/fs/ocfs2/heartbeat.h
+++ b/fs/ocfs2/heartbeat.h
@@ -28,7 +28,7 @@

void ocfs2_init_node_maps(struct ocfs2_super *osb);

-void ocfs2_setup_hb_callbacks(struct ocfs2_super *osb);
+void ocfs2_do_node_down(int node_num, void *data);
void ocfs2_stop_heartbeat(struct ocfs2_super *osb);

/* node map functions - used to keep track of mounted and in-recovery
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 6d7c6d2..664e4fe 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -248,12 +248,10 @@ struct ocfs2_super
struct ocfs2_alloc_stats alloc_stats;
char dev_str[20]; /* "major,minor" of the device */

- struct dlm_ctxt *dlm;
+ struct ocfs2_cluster_connection *cconn;
struct ocfs2_lock_res osb_super_lockres;
struct ocfs2_lock_res osb_rename_lockres;
- struct dlm_eviction_cb osb_eviction_cb;
struct ocfs2_dlm_debug *osb_dlm_debug;
- struct dlm_protocol_version osb_locking_proto;

struct dentry *osb_debug_root;

diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index eb88854..f6f309a 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -18,11 +18,21 @@
* General Public License for more details.
*/

+#include <linux/slab.h>
+#include <linux/crc32.h>
+
+/* Needed for AOP_TRUNCATED_PAGE in mlog_errno() */
+#include <linux/fs.h>
+
#include "cluster/masklog.h"
#include "stackglue.h"

static struct ocfs2_locking_protocol *lproto;

+struct o2dlm_private {
+ struct dlm_eviction_cb op_eviction_cb;
+};
+
/* These should be identical */
#if (DLM_LOCK_IV != LKM_IVMODE)
# error Lock modes do not match
@@ -197,7 +207,7 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
lproto->lp_unlock_ast(astarg, error);
}

-int ocfs2_dlm_lock(struct dlm_ctxt *dlm,
+int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
union ocfs2_dlm_lksb *lksb,
u32 flags,
@@ -212,15 +222,15 @@ int ocfs2_dlm_lock(struct dlm_ctxt *dlm,

BUG_ON(lproto == NULL);

- status = dlmlock(dlm, o2dlm_mode, &lksb->lksb_o2dlm, o2dlm_flags,
- name, namelen,
+ status = dlmlock(conn->cc_lockspace, o2dlm_mode, &lksb->lksb_o2dlm,
+ o2dlm_flags, name, namelen,
o2dlm_lock_ast_wrapper, astarg,
o2dlm_blocking_ast_wrapper);
ret = dlm_status_to_errno(status);
return ret;
}

-int ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
+int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *astarg)
@@ -231,8 +241,8 @@ int ocfs2_dlm_unlock(struct dlm_ctxt *dlm,

BUG_ON(lproto == NULL);

- status = dlmunlock(dlm, &lksb->lksb_o2dlm, o2dlm_flags,
- o2dlm_unlock_ast_wrapper, astarg);
+ status = dlmunlock(conn->cc_lockspace, &lksb->lksb_o2dlm,
+ o2dlm_flags, o2dlm_unlock_ast_wrapper, astarg);
ret = dlm_status_to_errno(status);
return ret;
}
@@ -252,6 +262,115 @@ void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb)
return (void *)(lksb->lksb_o2dlm.lvb);
}

+/*
+ * Called from the dlm when it's about to evict a node. This is how the
+ * classic stack signals node death.
+ */
+static void o2dlm_eviction_cb(int node_num, void *data)
+{
+ struct ocfs2_cluster_connection *conn = data;
+
+ mlog(ML_NOTICE, "o2dlm has evicted node %d from group %.*s\n",
+ node_num, conn->cc_namelen, conn->cc_name);
+
+ conn->cc_recovery_handler(node_num, conn->cc_recovery_data);
+}
+
+int ocfs2_cluster_connect(const char *group,
+ int grouplen,
+ void (*recovery_handler)(int node_num,
+ void *recovery_data),
+ void *recovery_data,
+ struct ocfs2_cluster_connection **conn)
+{
+ int rc = 0;
+ struct ocfs2_cluster_connection *new_conn;
+ u32 dlm_key;
+ struct dlm_ctxt *dlm;
+ struct o2dlm_private *priv;
+ struct dlm_protocol_version dlm_version;
+
+ BUG_ON(group == NULL);
+ BUG_ON(conn == NULL);
+ BUG_ON(recovery_handler == NULL);
+
+ if (grouplen > GROUP_NAME_MAX) {
+ rc = -EINVAL;
+ goto out;
+ }
+
+ new_conn = kzalloc(sizeof(struct ocfs2_cluster_connection),
+ GFP_KERNEL);
+ if (!new_conn) {
+ rc = -ENOMEM;
+ goto out;
+ }
+
+ memcpy(new_conn->cc_name, group, grouplen);
+ new_conn->cc_namelen = grouplen;
+ new_conn->cc_recovery_handler = recovery_handler;
+ new_conn->cc_recovery_data = recovery_data;
+
+ /* Start the new connection at our maximum compatibility level */
+ new_conn->cc_version = lproto->lp_max_version;
+
+ priv = kzalloc(sizeof(struct o2dlm_private), GFP_KERNEL);
+ if (!priv) {
+ rc = -ENOMEM;
+ goto out_free;
+ }
+
+ /* This just fills the structure in. It is safe to use new_conn. */
+ dlm_setup_eviction_cb(&priv->op_eviction_cb, o2dlm_eviction_cb,
+ new_conn);
+
+ new_conn->cc_private = priv;
+
+ /* used by the dlm code to make message headers unique, each
+ * node in this domain must agree on this. */
+ dlm_key = crc32_le(0, group, grouplen);
+ dlm_version.pv_major = new_conn->cc_version.pv_major;
+ dlm_version.pv_minor = new_conn->cc_version.pv_minor;
+
+ dlm = dlm_register_domain(group, dlm_key, &dlm_version);
+ if (IS_ERR(dlm)) {
+ rc = PTR_ERR(dlm);
+ mlog_errno(rc);
+ goto out_free;
+ }
+
+ new_conn->cc_version.pv_major = dlm_version.pv_major;
+ new_conn->cc_version.pv_minor = dlm_version.pv_minor;
+ new_conn->cc_lockspace = dlm;
+
+ dlm_register_eviction_cb(dlm, &priv->op_eviction_cb);
+
+ *conn = new_conn;
+
+out_free:
+ if (rc) {
+ kfree(new_conn->cc_private);
+ kfree(new_conn);
+ }
+
+out:
+ return rc;
+}
+
+int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn)
+{
+ struct dlm_ctxt *dlm = conn->cc_lockspace;
+ struct o2dlm_private *priv = conn->cc_private;
+
+ dlm_unregister_eviction_cb(&priv->op_eviction_cb);
+ dlm_unregister_domain(dlm);
+
+ kfree(priv);
+ kfree(conn);
+
+ return 0;
+}
+
void o2cb_get_stack(struct ocfs2_locking_protocol *proto)
{
BUG_ON(proto == NULL);
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index 3c91e24..3900b5c 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -32,9 +32,22 @@
*/
#define DLM_LKF_LOCAL 0x00100000

+/*
+ * This shadows DLM_LOCKSPACE_LEN in fs/dlm/dlm_internal.h. That probably
+ * wants to be in a public header.
+ */
+#define GROUP_NAME_MAX 64
+
+
#include "dlm/dlmapi.h"

+struct ocfs2_protocol_version {
+ u8 pv_major;
+ u8 pv_minor;
+};
+
struct ocfs2_locking_protocol {
+ struct ocfs2_protocol_version lp_max_version;
void (*lp_lock_ast)(void *astarg);
void (*lp_blocking_ast)(void *astarg, int level);
void (*lp_unlock_ast)(void *astarg, int error);
@@ -44,14 +57,32 @@ union ocfs2_dlm_lksb {
struct dlm_lockstatus lksb_o2dlm;
};

-int ocfs2_dlm_lock(struct dlm_ctxt *dlm,
+struct ocfs2_cluster_connection {
+ char cc_name[GROUP_NAME_MAX];
+ int cc_namelen;
+ struct ocfs2_protocol_version cc_version;
+ void (*cc_recovery_handler)(int node_num, void *recovery_data);
+ void *cc_recovery_data;
+ void *cc_lockspace;
+ void *cc_private;
+};
+
+int ocfs2_cluster_connect(const char *group,
+ int grouplen,
+ void (*recovery_handler)(int node_num,
+ void *recovery_data),
+ void *recovery_data,
+ struct ocfs2_cluster_connection **conn);
+int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn);
+
+int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen,
void *astarg);
-int ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
+int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *astarg);
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index c867546..0ee4975 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -1251,9 +1251,9 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)

ocfs2_sync_blockdev(sb);

- /* No dlm means we've failed during mount, so skip all the
- * steps which depended on that to complete. */
- if (osb->dlm) {
+ /* No cluster connection means we've failed during mount, so skip
+ * all the steps which depended on that to complete. */
+ if (osb->cconn) {
tmp = ocfs2_super_lock(osb, 1);
if (tmp < 0) {
mlog_errno(tmp);
@@ -1264,12 +1264,12 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
if (osb->slot_num != OCFS2_INVALID_SLOT)
ocfs2_put_slot(osb);

- if (osb->dlm)
+ if (osb->cconn)
ocfs2_super_unlock(osb, 1);

ocfs2_release_system_inodes(osb);

- if (osb->dlm)
+ if (osb->cconn)
ocfs2_dlm_shutdown(osb);

debugfs_remove(osb->osb_debug_root);
@@ -1341,7 +1341,6 @@ static int ocfs2_initialize_super(struct super_block *sb,
sb->s_fs_info = osb;
sb->s_op = &ocfs2_sops;
sb->s_export_op = &ocfs2_export_ops;
- osb->osb_locking_proto = ocfs2_locking_protocol;
sb->s_time_gran = 1;
sb->s_flags |= MS_NOATIME;
/* this is needed to support O_LARGEFILE */
@@ -1391,8 +1390,6 @@ static int ocfs2_initialize_super(struct super_block *sb,
osb->local_alloc_state = OCFS2_LA_UNUSED;
osb->local_alloc_bh = NULL;

- ocfs2_setup_hb_callbacks(osb);
-
init_waitqueue_head(&osb->osb_mount_event);

osb->vol_label = kmalloc(OCFS2_MAX_VOL_LABEL_LEN, GFP_KERNEL);
--
1.5.3.8

2008-03-06 00:31:58

by Joel Becker

[permalink] [raw]
Subject: [PATCH 07/18] ocfs2: Move o2hb functionality into the stack glue.

The last bit of classic stack used directly in ocfs2 code is o2hb.
Specifically, the check for heartbeat during mount and the call to
ocfs2_hb_ctl during unmount.

We create an extra API, ocfs2_cluster_hangup(), to encapsulate the call
to ocfs2_hb_ctl. Other stacks will just leave hangup() empty.

The check for heartbeat is moved into ocfs2_cluster_connect(). It will
be matched by a similar check for other stacks.

With this change, only stackglue.c includes cluster/ headers.

Signed-off-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/dlmglue.c | 4 ----
fs/ocfs2/heartbeat.c | 33 ---------------------------------
fs/ocfs2/heartbeat.h | 1 -
fs/ocfs2/ioctl.c | 1 +
fs/ocfs2/ocfs2.h | 4 ----
fs/ocfs2/stackglue.c | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++
fs/ocfs2/stackglue.h | 1 +
fs/ocfs2/super.c | 23 ++++++++++-------------
8 files changed, 62 insertions(+), 55 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 9e13d4b..aa13d15 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -32,10 +32,6 @@
#include <linux/debugfs.h>
#include <linux/seq_file.h>

-#include <cluster/heartbeat.h>
-#include <cluster/nodemanager.h>
-#include <cluster/tcp.h>
-
#define MLOG_MASK_PREFIX ML_DLM_GLUE
#include <cluster/masklog.h>

diff --git a/fs/ocfs2/heartbeat.c b/fs/ocfs2/heartbeat.c
index dcac1a4..c6e7213 100644
--- a/fs/ocfs2/heartbeat.c
+++ b/fs/ocfs2/heartbeat.c
@@ -28,7 +28,6 @@
#include <linux/types.h>
#include <linux/slab.h>
#include <linux/highmem.h>
-#include <linux/kmod.h>

#define MLOG_MASK_PREFIX ML_SUPER
#include <cluster/masklog.h>
@@ -83,38 +82,6 @@ void ocfs2_do_node_down(int node_num, void *data)
ocfs2_recovery_thread(osb, node_num);
}

-void ocfs2_stop_heartbeat(struct ocfs2_super *osb)
-{
- int ret;
- char *argv[5], *envp[3];
-
- if (ocfs2_mount_local(osb))
- return;
-
- if (!osb->uuid_str) {
- /* This can happen if we don't get far enough in mount... */
- mlog(0, "No UUID with which to stop heartbeat!\n\n");
- return;
- }
-
- argv[0] = (char *)o2nm_get_hb_ctl_path();
- argv[1] = "-K";
- argv[2] = "-u";
- argv[3] = osb->uuid_str;
- argv[4] = NULL;
-
- mlog(0, "Run: %s %s %s %s\n", argv[0], argv[1], argv[2], argv[3]);
-
- /* minimal command environment taken from cpu_run_sbin_hotplug */
- envp[0] = "HOME=/";
- envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
- envp[2] = NULL;
-
- ret = call_usermodehelper(argv[0], argv, envp, UMH_WAIT_PROC);
- if (ret < 0)
- mlog_errno(ret);
-}
-
static inline void __ocfs2_node_map_set_bit(struct ocfs2_node_map *map,
int bit)
{
diff --git a/fs/ocfs2/heartbeat.h b/fs/ocfs2/heartbeat.h
index 38e2450..74b9c5d 100644
--- a/fs/ocfs2/heartbeat.h
+++ b/fs/ocfs2/heartbeat.h
@@ -29,7 +29,6 @@
void ocfs2_init_node_maps(struct ocfs2_super *osb);

void ocfs2_do_node_down(int node_num, void *data);
-void ocfs2_stop_heartbeat(struct ocfs2_super *osb);

/* node map functions - used to keep track of mounted and in-recovery
* nodes. */
diff --git a/fs/ocfs2/ioctl.c b/fs/ocfs2/ioctl.c
index 5177fba..ab1c216 100644
--- a/fs/ocfs2/ioctl.c
+++ b/fs/ocfs2/ioctl.c
@@ -7,6 +7,7 @@

#include <linux/fs.h>
#include <linux/mount.h>
+#include <linux/smp_lock.h>

#define MLOG_MASK_PREFIX ML_INODE
#include <cluster/masklog.h>
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 7006aba..31dc28b 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -36,10 +36,6 @@
#include <linux/mutex.h>
#include <linux/jbd.h>

-#include "cluster/nodemanager.h"
-#include "cluster/heartbeat.h"
-#include "cluster/tcp.h"
-
/* For union ocfs2_dlm_lksb */
#include "stackglue.h"

diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 8146863..670fa94 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -20,12 +20,14 @@

#include <linux/slab.h>
#include <linux/crc32.h>
+#include <linux/kmod.h>

/* Needed for AOP_TRUNCATED_PAGE in mlog_errno() */
#include <linux/fs.h>

#include "cluster/masklog.h"
#include "cluster/nodemanager.h"
+#include "cluster/heartbeat.h"

#include "stackglue.h"

@@ -301,6 +303,13 @@ int ocfs2_cluster_connect(const char *group,
goto out;
}

+ /* for now we only have one cluster/node, make sure we see it
+ * in the heartbeat universe */
+ if (!o2hb_check_local_node_heartbeating()) {
+ rc = -EINVAL;
+ goto out;
+ }
+
new_conn = kzalloc(sizeof(struct ocfs2_cluster_connection),
GFP_KERNEL);
if (!new_conn) {
@@ -359,6 +368,7 @@ out:
return rc;
}

+
int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn)
{
struct dlm_ctxt *dlm = conn->cc_lockspace;
@@ -373,6 +383,46 @@ int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn)
return 0;
}

+static void o2hb_stop(const char *group)
+{
+ int ret;
+ char *argv[5], *envp[3];
+
+ argv[0] = (char *)o2nm_get_hb_ctl_path();
+ argv[1] = "-K";
+ argv[2] = "-u";
+ argv[3] = (char *)group;
+ argv[4] = NULL;
+
+ mlog(0, "Run: %s %s %s %s\n", argv[0], argv[1], argv[2], argv[3]);
+
+ /* minimal command environment taken from cpu_run_sbin_hotplug */
+ envp[0] = "HOME=/";
+ envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
+ envp[2] = NULL;
+
+ ret = call_usermodehelper(argv[0], argv, envp, UMH_WAIT_PROC);
+ if (ret < 0)
+ mlog_errno(ret);
+}
+
+/*
+ * Hangup is a hack for tools compatibility. Older ocfs2-tools software
+ * expects the filesystem to call "ocfs2_hb_ctl" during unmount. This
+ * happens regardless of whether the DLM got started, so we can't do it
+ * in ocfs2_cluster_disconnect(). We bring the o2hb_stop() function into
+ * the glue and provide a "hangup" API for super.c to call.
+ *
+ * Other stacks will eventually provide a NULL ->hangup() pointer.
+ */
+void ocfs2_cluster_hangup(const char *group, int grouplen)
+{
+ BUG_ON(group == NULL);
+ BUG_ON(group[grouplen] != '\0');
+
+ o2hb_stop(group);
+}
+
int ocfs2_cluster_this_node(unsigned int *node)
{
int node_num;
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index ccb0399..22af77b 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -74,6 +74,7 @@ int ocfs2_cluster_connect(const char *group,
void *recovery_data,
struct ocfs2_cluster_connection **conn);
int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn);
+void ocfs2_cluster_hangup(const char *group, int grouplen);
int ocfs2_cluster_this_node(unsigned int *node);

int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index d3c4d32..8f536b3 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -40,8 +40,7 @@
#include <linux/crc32.h>
#include <linux/debugfs.h>
#include <linux/mount.h>
-
-#include <cluster/nodemanager.h>
+#include <linux/seq_file.h>

#define MLOG_MASK_PREFIX ML_SUPER
#include <cluster/masklog.h>
@@ -579,15 +578,6 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
goto read_super_error;
}

- /* for now we only have one cluster/node, make sure we see it
- * in the heartbeat universe */
- if (parsed_options.mount_opt & OCFS2_MOUNT_HB_LOCAL) {
- if (!o2hb_check_local_node_heartbeating()) {
- status = -EINVAL;
- goto read_super_error;
- }
- }
-
/* probe for superblock */
status = ocfs2_sb_probe(sb, &bh, &sector_size);
if (status < 0) {
@@ -1275,8 +1265,15 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)

debugfs_remove(osb->osb_debug_root);

- if (!mnt_err)
- ocfs2_stop_heartbeat(osb);
+ /*
+ * This is a small hack to move ocfs2_hb_ctl into stackglue.
+ * If we're dismounting due to mount error, mount.ocfs2 will clean
+ * up heartbeat. If we're a local mount, there is no heartbeat.
+ * If we failed before we got a uuid_str yet, we can't stop
+ * heartbeat. Otherwise, do it.
+ */
+ if (!mnt_err && !ocfs2_mount_local(osb) && osb->uuid_str)
+ ocfs2_cluster_hangup(osb->uuid_str, strlen(osb->uuid_str));

atomic_set(&osb->vol_state, VOLUME_DISMOUNTED);

--
1.5.3.8

2008-03-06 00:32:43

by Joel Becker

[permalink] [raw]
Subject: [PATCH 16/18] ocfs2: Create stack glue sysfs files.

Introduce a set of sysfs files that describe the current stack glue
state. The files live under /sys/fs/ocfs2. The locking_protocol file
displays the version of ocfs2's locking code. The
loaded_cluster_plugins file displays all of the currently loaded stack
plugins. When filesystems are mounted, the active_cluster_plugin file
will display the plugin in use.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/stackglue.c | 121 +++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 120 insertions(+), 1 deletions(-)

diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 1978c9c..76ae4fc 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -23,6 +23,9 @@
#include <linux/module.h>
#include <linux/slab.h>
#include <linux/kmod.h>
+#include <linux/fs.h>
+#include <linux/kobject.h>
+#include <linux/sysfs.h>

#include "stackglue.h"

@@ -335,14 +338,130 @@ int ocfs2_cluster_this_node(unsigned int *node)
EXPORT_SYMBOL_GPL(ocfs2_cluster_this_node);


-static int __init ocfs2_stack_glue_init(void)
+/*
+ * Sysfs bits
+ */
+
+static ssize_t ocfs2_max_locking_protocol_show(struct kobject *kobj,
+ struct kobj_attribute *attr,
+ char *buf)
+{
+ ssize_t ret = 0;
+
+ spin_lock(&ocfs2_stack_lock);
+ if (lproto)
+ ret = snprintf(buf, PAGE_SIZE, "%u.%u\n",
+ lproto->lp_max_version.pv_major,
+ lproto->lp_max_version.pv_minor);
+ spin_unlock(&ocfs2_stack_lock);
+
+ return ret;
+}
+
+static struct kobj_attribute ocfs2_attr_max_locking_protocol =
+ __ATTR(max_locking_protocol, S_IFREG | S_IRUGO,
+ ocfs2_max_locking_protocol_show, NULL);
+
+static ssize_t ocfs2_loaded_cluster_plugins_show(struct kobject *kobj,
+ struct kobj_attribute *attr,
+ char *buf)
{
+ ssize_t ret = 0, total = 0, remain = PAGE_SIZE;
+ struct ocfs2_stack_plugin *p;
+
+ spin_lock(&ocfs2_stack_lock);
+ list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
+ ret = snprintf(buf, remain, "%s\n",
+ p->sp_name);
+ if (ret < 0) {
+ total = ret;
+ break;
+ }
+ if (ret == remain) {
+ /* snprintf() didn't fit */
+ total = -E2BIG;
+ break;
+ }
+ total += ret;
+ remain -= ret;
+ }
+ spin_unlock(&ocfs2_stack_lock);
+
+ return total;
+}
+
+static struct kobj_attribute ocfs2_attr_loaded_cluster_plugins =
+ __ATTR(loaded_cluster_plugins, S_IFREG | S_IRUGO,
+ ocfs2_loaded_cluster_plugins_show, NULL);
+
+static ssize_t ocfs2_active_cluster_plugin_show(struct kobject *kobj,
+ struct kobj_attribute *attr,
+ char *buf)
+{
+ ssize_t ret = 0;
+
+ spin_lock(&ocfs2_stack_lock);
+ if (active_stack) {
+ ret = snprintf(buf, PAGE_SIZE, "%s\n",
+ active_stack->sp_name);
+ if (ret == PAGE_SIZE)
+ ret = -E2BIG;
+ }
+ spin_unlock(&ocfs2_stack_lock);
+
+ return ret;
+}
+
+static struct kobj_attribute ocfs2_attr_active_cluster_plugin =
+ __ATTR(active_cluster_plugin, S_IFREG | S_IRUGO,
+ ocfs2_active_cluster_plugin_show, NULL);
+
+static struct attribute *ocfs2_attrs[] = {
+ &ocfs2_attr_max_locking_protocol.attr,
+ &ocfs2_attr_loaded_cluster_plugins.attr,
+ &ocfs2_attr_active_cluster_plugin.attr,
+ NULL,
+};
+
+static struct attribute_group ocfs2_attr_group = {
+ .attrs = ocfs2_attrs,
+};
+
+static struct kset *ocfs2_kset;
+
+static void ocfs2_sysfs_exit(void)
+{
+ kset_unregister(ocfs2_kset);
+}
+
+static int ocfs2_sysfs_init(void)
+{
+ int ret;
+
+ ocfs2_kset = kset_create_and_add("ocfs2", NULL, fs_kobj);
+ if (!ocfs2_kset)
+ return -ENOMEM;
+
+ ret = sysfs_create_group(&ocfs2_kset->kobj, &ocfs2_attr_group);
+ if (ret)
+ goto error;
+
return 0;
+
+error:
+ kset_unregister(ocfs2_kset);
+ return ret;
+}
+
+static int __init ocfs2_stack_glue_init(void)
+{
+ return ocfs2_sysfs_init();
}

static void __exit ocfs2_stack_glue_exit(void)
{
lproto = NULL;
+ ocfs2_sysfs_exit();
}

MODULE_AUTHOR("Oracle");
--
1.5.3.8

2008-03-06 00:33:18

by Joel Becker

[permalink] [raw]
Subject: [PATCH 04/18] ocfs2: Create the lock status block union.

Wrap the lock status block (lksb) in a union. Later we will add a union
element for the fs/dlm lksb. Create accessors for the status and lvb
fields.

Other than a debugging function, dlmglue.c does not directly reference
the o2dlm locking path anymore.

Signed-off-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/dlmglue.c | 23 +++++++++++++----------
fs/ocfs2/ocfs2.h | 5 +++--
fs/ocfs2/stackglue.c | 29 ++++++++++++++++++++++-------
fs/ocfs2/stackglue.h | 11 +++++++++--
4 files changed, 47 insertions(+), 21 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 12a5213..0053945 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -112,7 +112,8 @@ static void ocfs2_dump_meta_lvb_info(u64 level,
unsigned int line,
struct ocfs2_lock_res *lockres)
{
- struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+ struct ocfs2_meta_lvb *lvb =
+ (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);

mlog(level, "LVB information for %s (called from %s:%u):\n",
lockres->l_name, function, line);
@@ -799,14 +800,14 @@ static void ocfs2_blocking_ast(void *opaque, int level)
static void ocfs2_locking_ast(void *opaque)
{
struct ocfs2_lock_res *lockres = opaque;
- struct dlm_lockstatus *lksb = &lockres->l_lksb;
unsigned long flags;

spin_lock_irqsave(&lockres->l_lock, flags);

- if (lksb->status != DLM_NORMAL) {
- mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
- lockres->l_name, lksb->status);
+ if (ocfs2_dlm_lock_status(&lockres->l_lksb)) {
+ mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
+ lockres->l_name,
+ ocfs2_dlm_lock_status(&lockres->l_lksb));
spin_unlock_irqrestore(&lockres->l_lock, flags);
return;
}
@@ -1634,7 +1635,7 @@ static void __ocfs2_stuff_meta_lvb(struct inode *inode)

mlog_entry_void();

- lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+ lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);

/*
* Invalidate the LVB of a deleted inode - this way other
@@ -1686,7 +1687,7 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode)

mlog_meta_lvb(0, lockres);

- lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+ lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);

/* We're safe here without the lockres lock... */
spin_lock(&oi->ip_lock);
@@ -1721,7 +1722,8 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
struct ocfs2_lock_res *lockres)
{
- struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+ struct ocfs2_meta_lvb *lvb =
+ (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);

if (lvb->lvb_version == OCFS2_LVB_VERSION
&& be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
@@ -2379,7 +2381,7 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
lockres->l_blocking);

/* Dump the raw LVB */
- lvb = lockres->l_lksb.lvb;
+ lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
for(i = 0; i < DLM_LVB_LEN; i++)
seq_printf(m, "0x%x\t", lvb[i]);

@@ -2692,7 +2694,8 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
- dlm_print_one_lock(lockres->l_lksb.lockid);
+ /* XXX Need to abstract this */
+ dlm_print_one_lock(lockres->l_lksb.lksb_o2dlm.lockid);
BUG();
}
mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n",
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index f78e9ed..6d7c6d2 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -40,7 +40,8 @@
#include "cluster/heartbeat.h"
#include "cluster/tcp.h"

-#include "dlm/dlmapi.h"
+/* For union ocfs2_dlm_lksb */
+#include "stackglue.h"

#include "ocfs2_fs.h"
#include "ocfs2_lockid.h"
@@ -120,7 +121,7 @@ struct ocfs2_lock_res {
int l_level;
unsigned int l_ro_holders;
unsigned int l_ex_holders;
- struct dlm_lockstatus l_lksb;
+ union ocfs2_dlm_lksb l_lksb;

/* used from AST/BAST funcs. */
enum ocfs2_ast_action l_action;
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 0aec2fc..eb88854 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -199,7 +199,7 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)

int ocfs2_dlm_lock(struct dlm_ctxt *dlm,
int mode,
- struct dlm_lockstatus *lksb,
+ union ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen,
@@ -212,15 +212,16 @@ int ocfs2_dlm_lock(struct dlm_ctxt *dlm,

BUG_ON(lproto == NULL);

- status = dlmlock(dlm, o2dlm_mode, lksb, o2dlm_flags, name, namelen,
- o2dlm_lock_ast_wrapper, astarg,
- o2dlm_blocking_ast_wrapper);
+ status = dlmlock(dlm, o2dlm_mode, &lksb->lksb_o2dlm, o2dlm_flags,
+ name, namelen,
+ o2dlm_lock_ast_wrapper, astarg,
+ o2dlm_blocking_ast_wrapper);
ret = dlm_status_to_errno(status);
return ret;
}

int ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
- struct dlm_lockstatus *lksb,
+ union ocfs2_dlm_lksb *lksb,
u32 flags,
void *astarg)
{
@@ -230,12 +231,26 @@ int ocfs2_dlm_unlock(struct dlm_ctxt *dlm,

BUG_ON(lproto == NULL);

- status = dlmunlock(dlm, lksb, o2dlm_flags,
- o2dlm_unlock_ast_wrapper, astarg);
+ status = dlmunlock(dlm, &lksb->lksb_o2dlm, o2dlm_flags,
+ o2dlm_unlock_ast_wrapper, astarg);
ret = dlm_status_to_errno(status);
return ret;
}

+int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
+{
+ return dlm_status_to_errno(lksb->lksb_o2dlm.status);
+}
+
+/*
+ * Why don't we cast to ocfs2_meta_lvb? The "clean" answer is that we
+ * don't cast at the glue level. The real answer is that the header
+ * ordering is nigh impossible.
+ */
+void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb)
+{
+ return (void *)(lksb->lksb_o2dlm.lvb);
+}

void o2cb_get_stack(struct ocfs2_locking_protocol *proto)
{
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index 8ebcfba..3c91e24 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -40,18 +40,25 @@ struct ocfs2_locking_protocol {
void (*lp_unlock_ast)(void *astarg, int error);
};

+union ocfs2_dlm_lksb {
+ struct dlm_lockstatus lksb_o2dlm;
+};
+
int ocfs2_dlm_lock(struct dlm_ctxt *dlm,
int mode,
- struct dlm_lockstatus *lksb,
+ union ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen,
void *astarg);
int ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
- struct dlm_lockstatus *lksb,
+ union ocfs2_dlm_lksb *lksb,
u32 flags,
void *astarg);

+int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb);
+void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb);
+
void o2cb_get_stack(struct ocfs2_locking_protocol *proto);
void o2cb_put_stack(void);

--
1.5.3.8

2008-03-06 00:33:52

by Joel Becker

[permalink] [raw]
Subject: [PATCH 18/18] ocfs2: Add the 'cluster_stack' sysfs file.

Userspace can now query and specify the cluster stack in use via the
/sys/fs/ocfs2/cluster_stack file. By default, it is 'o2cb', which is
the classic stack. Thus, old tools that do not know how to modify this
file will work just fine. The stack cannot be modified if there is a
live filesystem.

ocfs2_cluster_connect() now takes the expected cluster stack as an
argument. This way, the filesystem and the stack glue ensure they are
speaking to the same backend.

If the stack is 'o2cb', the o2cb stack plugin is used. For any other
value, the fsdlm stack plugin is selected.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/dlmglue.c | 3 +-
fs/ocfs2/stackglue.c | 111 +++++++++++++++++++++++++++++++++++++++++++++-----
fs/ocfs2/stackglue.h | 3 +-
3 files changed, 104 insertions(+), 13 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 71af7d6..737f0b2 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -2627,7 +2627,8 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
}

/* for now, uuid == domain */
- status = ocfs2_cluster_connect(osb->uuid_str,
+ status = ocfs2_cluster_connect(osb->osb_cluster_stack,
+ osb->uuid_str,
strlen(osb->uuid_str),
ocfs2_do_node_down, osb,
&conn);
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 76ae4fc..bf45d9b 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -27,11 +27,17 @@
#include <linux/kobject.h>
#include <linux/sysfs.h>

+#include "ocfs2_fs.h"
+
#include "stackglue.h"

+#define OCFS2_STACK_PLUGIN_O2CB "o2cb"
+#define OCFS2_STACK_PLUGIN_USER "user"
+
static struct ocfs2_locking_protocol *lproto;
static DEFINE_SPINLOCK(ocfs2_stack_lock);
static LIST_HEAD(ocfs2_stack_list);
+static char cluster_stack_name[OCFS2_STACK_LABEL_LEN + 1];

/*
* The stack currently in use. If not null, active_stack->sp_count > 0,
@@ -53,26 +59,36 @@ static struct ocfs2_stack_plugin *ocfs2_stack_lookup(const char *name)
return NULL;
}

-static int ocfs2_stack_driver_request(const char *name)
+static int ocfs2_stack_driver_request(const char *stack_name,
+ const char *plugin_name)
{
int rc;
struct ocfs2_stack_plugin *p;

spin_lock(&ocfs2_stack_lock);

+ /*
+ * If the stack passed by the filesystem isn't the selected one,
+ * we can't continue.
+ */
+ if (strcmp(stack_name, cluster_stack_name)) {
+ rc = -EBUSY;
+ goto out;
+ }
+
if (active_stack) {
/*
* If the active stack isn't the one we want, it cannot
* be selected right now.
*/
- if (!strcmp(active_stack->sp_name, name))
+ if (!strcmp(active_stack->sp_name, plugin_name))
rc = 0;
else
rc = -EBUSY;
goto out;
}

- p = ocfs2_stack_lookup(name);
+ p = ocfs2_stack_lookup(plugin_name);
if (!p || !try_module_get(p->sp_owner)) {
rc = -ENOENT;
goto out;
@@ -94,23 +110,42 @@ out:
* there is no stack, it tries to load it. It will fail if the stack still
* cannot be found. It will also fail if a different stack is in use.
*/
-static int ocfs2_stack_driver_get(const char *name)
+static int ocfs2_stack_driver_get(const char *stack_name)
{
int rc;
+ char *plugin_name = OCFS2_STACK_PLUGIN_O2CB;
+
+ /*
+ * Classic stack does not pass in a stack name. This is
+ * compatible with older tools as well.
+ */
+ if (!stack_name || !*stack_name)
+ stack_name = OCFS2_STACK_PLUGIN_O2CB;
+
+ if (strlen(stack_name) != OCFS2_STACK_LABEL_LEN) {
+ printk(KERN_ERR
+ "ocfs2 passed an invalid cluster stack label: \"%s\"\n",
+ stack_name);
+ return -EINVAL;
+ }

- rc = ocfs2_stack_driver_request(name);
+ /* Anything that isn't the classic stack is a user stack */
+ if (strcmp(stack_name, OCFS2_STACK_PLUGIN_O2CB))
+ plugin_name = OCFS2_STACK_PLUGIN_USER;
+
+ rc = ocfs2_stack_driver_request(stack_name, plugin_name);
if (rc == -ENOENT) {
- request_module("ocfs2_stack_%s", name);
- rc = ocfs2_stack_driver_request(name);
+ request_module("ocfs2_stack_%s", plugin_name);
+ rc = ocfs2_stack_driver_request(stack_name, plugin_name);
}

if (rc == -ENOENT) {
printk(KERN_ERR
"ocfs2: Cluster stack driver \"%s\" cannot be found\n",
- name);
+ plugin_name);
} else if (rc == -EBUSY) {
printk(KERN_ERR
- "ocfs2: A different cluster stack driver is in use\n");
+ "ocfs2: A different cluster stack is in use\n");
}

return rc;
@@ -242,7 +277,8 @@ void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_dump_lksb);

-int ocfs2_cluster_connect(const char *group,
+int ocfs2_cluster_connect(const char *stack_name,
+ const char *group,
int grouplen,
void (*recovery_handler)(int node_num,
void *recovery_data),
@@ -277,7 +313,7 @@ int ocfs2_cluster_connect(const char *group,
new_conn->cc_version = lproto->lp_max_version;

/* This will pin the stack driver if successful */
- rc = ocfs2_stack_driver_get("o2cb");
+ rc = ocfs2_stack_driver_get(stack_name);
if (rc)
goto out_free;

@@ -416,10 +452,61 @@ static struct kobj_attribute ocfs2_attr_active_cluster_plugin =
__ATTR(active_cluster_plugin, S_IFREG | S_IRUGO,
ocfs2_active_cluster_plugin_show, NULL);

+static ssize_t ocfs2_cluster_stack_show(struct kobject *kobj,
+ struct kobj_attribute *attr,
+ char *buf)
+{
+ ssize_t ret;
+ spin_lock(&ocfs2_stack_lock);
+ ret = snprintf(buf, PAGE_SIZE, "%s\n", cluster_stack_name);
+ spin_unlock(&ocfs2_stack_lock);
+
+ return ret;
+}
+
+static ssize_t ocfs2_cluster_stack_store(struct kobject *kobj,
+ struct kobj_attribute *attr,
+ const char *buf, size_t count)
+{
+ size_t len = count;
+ ssize_t ret;
+
+ if (len == 0)
+ return len;
+
+ if (buf[len - 1] == '\n')
+ len--;
+
+ if ((len != OCFS2_STACK_LABEL_LEN) ||
+ (strnlen(buf, len) != len))
+ return -EINVAL;
+
+ spin_lock(&ocfs2_stack_lock);
+ if (active_stack) {
+ if (!strncmp(buf, cluster_stack_name, len))
+ ret = count;
+ else
+ ret = -EBUSY;
+ } else {
+ memcpy(cluster_stack_name, buf, len);
+ ret = count;
+ }
+ spin_unlock(&ocfs2_stack_lock);
+
+ return ret;
+}
+
+
+static struct kobj_attribute ocfs2_attr_cluster_stack =
+ __ATTR(cluster_stack, S_IFREG | S_IRUGO | S_IWUSR,
+ ocfs2_cluster_stack_show,
+ ocfs2_cluster_stack_store);
+
static struct attribute *ocfs2_attrs[] = {
&ocfs2_attr_max_locking_protocol.attr,
&ocfs2_attr_loaded_cluster_plugins.attr,
&ocfs2_attr_active_cluster_plugin.attr,
+ &ocfs2_attr_cluster_stack.attr,
NULL,
};

@@ -455,6 +542,8 @@ error:

static int __init ocfs2_stack_glue_init(void)
{
+ strcpy(cluster_stack_name, OCFS2_STACK_PLUGIN_O2CB);
+
return ocfs2_sysfs_init();
}

diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index c96c8bb..d88bc65 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -209,7 +209,8 @@ struct ocfs2_stack_plugin {


/* Used by the filesystem */
-int ocfs2_cluster_connect(const char *group,
+int ocfs2_cluster_connect(const char *stack_name,
+ const char *group,
int grouplen,
void (*recovery_handler)(int node_num,
void *recovery_data),
--
1.5.3.8

2008-03-06 00:34:29

by Joel Becker

[permalink] [raw]
Subject: [PATCH 01/18] ocfs2: Separate out dlm lock functions.

This is the first in a series of patches to isolate ocfs2 from the
underlying cluster stack. Here we wrap the dlm locking functions with
ocfs2-specific calls. Because ocfs2 always uses the same dlm lock status
callbacks, we can eliminate the callbacks from the filesystem visible
functions.

Signed-off-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/Makefile | 1 +
fs/ocfs2/dlmglue.c | 110 +++++++++++++++++++++++++++----------------------
fs/ocfs2/dlmglue.h | 3 +
fs/ocfs2/stackglue.c | 65 +++++++++++++++++++++++++++++
fs/ocfs2/stackglue.h | 45 ++++++++++++++++++++
fs/ocfs2/super.c | 4 ++
6 files changed, 179 insertions(+), 49 deletions(-)
create mode 100644 fs/ocfs2/stackglue.c
create mode 100644 fs/ocfs2/stackglue.h

diff --git a/fs/ocfs2/Makefile b/fs/ocfs2/Makefile
index 4d4ce48..3ba64af 100644
--- a/fs/ocfs2/Makefile
+++ b/fs/ocfs2/Makefile
@@ -24,6 +24,7 @@ ocfs2-objs := \
namei.o \
resize.o \
slot_map.o \
+ stackglue.o \
suballoc.o \
super.o \
symlink.o \
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index b4108fe..1320410 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -53,6 +53,7 @@
#include "heartbeat.h"
#include "inode.h"
#include "journal.h"
+#include "stackglue.h"
#include "slot_map.h"
#include "super.h"
#include "uptodate.h"
@@ -888,22 +889,21 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
spin_unlock_irqrestore(&lockres->l_lock, flags);

- status = dlmlock(osb->dlm,
- level,
- &lockres->l_lksb,
- dlm_flags,
- lockres->l_name,
- OCFS2_LOCK_ID_MAX_LEN - 1,
- ocfs2_locking_ast,
- lockres,
- ocfs2_blocking_ast);
+ status = ocfs2_dlm_lock(osb->dlm,
+ level,
+ &lockres->l_lksb,
+ dlm_flags,
+ lockres->l_name,
+ OCFS2_LOCK_ID_MAX_LEN - 1,
+ lockres);
if (status != DLM_NORMAL) {
- ocfs2_log_dlm_error("dlmlock", status, lockres);
+ ocfs2_log_dlm_error("ocfs2_dlm_lock", status, lockres);
ret = -EINVAL;
ocfs2_recover_from_dlm_error(lockres, 1);
}

- mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
+ mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n",
+ lockres->l_name);

bail:
mlog_exit(ret);
@@ -1091,29 +1091,27 @@ again:
lockres->l_name, lockres->l_level, level);

/* call dlm_lock to upgrade lock now */
- status = dlmlock(osb->dlm,
- level,
- &lockres->l_lksb,
- lkm_flags,
- lockres->l_name,
- OCFS2_LOCK_ID_MAX_LEN - 1,
- ocfs2_locking_ast,
- lockres,
- ocfs2_blocking_ast);
+ status = ocfs2_dlm_lock(osb->dlm,
+ level,
+ &lockres->l_lksb,
+ lkm_flags,
+ lockres->l_name,
+ OCFS2_LOCK_ID_MAX_LEN - 1,
+ lockres);
if (status != DLM_NORMAL) {
if ((lkm_flags & LKM_NOQUEUE) &&
(status == DLM_NOTQUEUED))
ret = -EAGAIN;
else {
- ocfs2_log_dlm_error("dlmlock", status,
- lockres);
+ ocfs2_log_dlm_error("ocfs2_dlm_lock",
+ status, lockres);
ret = -EINVAL;
}
ocfs2_recover_from_dlm_error(lockres, 1);
goto out;
}

- mlog(0, "lock %s, successfull return from dlmlock\n",
+ mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n",
lockres->l_name);

/* At this point we've gone inside the dlm and need to
@@ -1503,14 +1501,14 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
spin_unlock_irqrestore(&lockres->l_lock, flags);

- ret = dlmlock(osb->dlm, level, &lockres->l_lksb, lkm_flags,
- lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
- ocfs2_locking_ast, lockres, ocfs2_blocking_ast);
+ ret = ocfs2_dlm_lock(osb->dlm, level, &lockres->l_lksb, lkm_flags,
+ lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
+ lockres);
if (ret != DLM_NORMAL) {
if (trylock && ret == DLM_NOTQUEUED)
ret = -EAGAIN;
else {
- ocfs2_log_dlm_error("dlmlock", ret, lockres);
+ ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
ret = -EINVAL;
}

@@ -2699,15 +2697,15 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,

mlog(0, "lock %s\n", lockres->l_name);

- status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
- ocfs2_unlock_ast, lockres);
+ status = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb, lkm_flags,
+ lockres);
if (status != DLM_NORMAL) {
- ocfs2_log_dlm_error("dlmunlock", status, lockres);
+ ocfs2_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
dlm_print_one_lock(lockres->l_lksb.lockid);
BUG();
}
- mlog(0, "lock %s, successfull return from dlmunlock\n",
+ mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n",
lockres->l_name);

ocfs2_wait_on_busy_lock(lockres);
@@ -2832,17 +2830,15 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
if (lvb)
dlm_flags |= LKM_VALBLK;

- status = dlmlock(osb->dlm,
- new_level,
- &lockres->l_lksb,
- dlm_flags,
- lockres->l_name,
- OCFS2_LOCK_ID_MAX_LEN - 1,
- ocfs2_locking_ast,
- lockres,
- ocfs2_blocking_ast);
+ status = ocfs2_dlm_lock(osb->dlm,
+ new_level,
+ &lockres->l_lksb,
+ dlm_flags,
+ lockres->l_name,
+ OCFS2_LOCK_ID_MAX_LEN - 1,
+ lockres);
if (status != DLM_NORMAL) {
- ocfs2_log_dlm_error("dlmlock", status, lockres);
+ ocfs2_log_dlm_error("ocfs2_dlm_lock", status, lockres);
ret = -EINVAL;
ocfs2_recover_from_dlm_error(lockres, 1);
goto bail;
@@ -2854,7 +2850,7 @@ bail:
return ret;
}

-/* returns 1 when the caller should unlock and call dlmunlock */
+/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
{
@@ -2896,18 +2892,17 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
mlog(0, "lock %s\n", lockres->l_name);

ret = 0;
- status = dlmunlock(osb->dlm,
- &lockres->l_lksb,
- LKM_CANCEL,
- ocfs2_unlock_ast,
- lockres);
+ status = ocfs2_dlm_unlock(osb->dlm,
+ &lockres->l_lksb,
+ LKM_CANCEL,
+ lockres);
if (status != DLM_NORMAL) {
- ocfs2_log_dlm_error("dlmunlock", status, lockres);
+ ocfs2_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
ret = -EINVAL;
ocfs2_recover_from_dlm_error(lockres, 0);
}

- mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
+ mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);

mlog_exit(ret);
return ret;
@@ -3211,6 +3206,23 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
return UNBLOCK_CONTINUE_POST;
}

+static struct ocfs2_locking_protocol lproto = {
+ .lp_lock_ast = ocfs2_locking_ast,
+ .lp_blocking_ast = ocfs2_blocking_ast,
+ .lp_unlock_ast = ocfs2_unlock_ast,
+};
+
+/* This interface isn't the final one, hence the less-than-perfect names */
+void dlmglue_init_stack(void)
+{
+ o2cb_get_stack(&lproto);
+}
+
+void dlmglue_exit_stack(void)
+{
+ o2cb_put_stack();
+}
+
static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
{
diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h
index e3cf902..3238043 100644
--- a/fs/ocfs2/dlmglue.h
+++ b/fs/ocfs2/dlmglue.h
@@ -114,5 +114,8 @@ void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb);
struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void);
void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug);

+void dlmglue_init_stack(void);
+void dlmglue_exit_stack(void);
+
extern const struct dlm_protocol_version ocfs2_locking_protocol;
#endif /* DLMGLUE_H */
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
new file mode 100644
index 0000000..4f44f23
--- /dev/null
+++ b/fs/ocfs2/stackglue.c
@@ -0,0 +1,65 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * stackglue.c
+ *
+ * Code which implements an OCFS2 specific interface to underlying
+ * cluster stacks.
+ *
+ * Copyright (C) 2007 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation, version 2.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ */
+
+#include <linux/types.h>
+#include <linux/list.h>
+
+#include "dlm/dlmapi.h"
+
+#include "stackglue.h"
+
+static struct ocfs2_locking_protocol *lproto;
+
+enum dlm_status ocfs2_dlm_lock(struct dlm_ctxt *dlm,
+ int mode,
+ struct dlm_lockstatus *lksb,
+ u32 flags,
+ void *name,
+ unsigned int namelen,
+ void *astarg)
+{
+ BUG_ON(lproto == NULL);
+ return dlmlock(dlm, mode, lksb, flags, name, namelen,
+ lproto->lp_lock_ast, astarg,
+ lproto->lp_blocking_ast);
+}
+
+enum dlm_status ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
+ struct dlm_lockstatus *lksb,
+ u32 flags,
+ void *astarg)
+{
+ BUG_ON(lproto == NULL);
+
+ return dlmunlock(dlm, lksb, flags, lproto->lp_unlock_ast, astarg);
+}
+
+
+void o2cb_get_stack(struct ocfs2_locking_protocol *proto)
+{
+ BUG_ON(proto == NULL);
+
+ lproto = proto;
+}
+
+void o2cb_put_stack(void)
+{
+ lproto = NULL;
+}
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
new file mode 100644
index 0000000..40a0024
--- /dev/null
+++ b/fs/ocfs2/stackglue.h
@@ -0,0 +1,45 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * stackglue.h
+ *
+ * Glue to the underlying cluster stack.
+ *
+ * Copyright (C) 2007 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation, version 2.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ */
+
+
+#ifndef STACKGLUE_H
+#define STACKGLUE_H
+
+struct ocfs2_locking_protocol {
+ void (*lp_lock_ast)(void *astarg);
+ void (*lp_blocking_ast)(void *astarg, int level);
+ void (*lp_unlock_ast)(void *astarg, enum dlm_status status);
+};
+
+enum dlm_status ocfs2_dlm_lock(struct dlm_ctxt *dlm,
+ int mode,
+ struct dlm_lockstatus *lksb,
+ u32 flags,
+ void *name,
+ unsigned int namelen,
+ void *astarg);
+enum dlm_status ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
+ struct dlm_lockstatus *lksb,
+ u32 flags,
+ void *astarg);
+
+void o2cb_get_stack(struct ocfs2_locking_protocol *proto);
+void o2cb_put_stack(void);
+
+#endif /* STACKGLUE_H */
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index 1a4c7c7..c867546 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -933,6 +933,8 @@ static int __init ocfs2_init(void)

ocfs2_print_version();

+ dlmglue_init_stack();
+
status = init_ocfs2_uptodate_cache();
if (status < 0) {
mlog_errno(status);
@@ -988,6 +990,8 @@ static void __exit ocfs2_exit(void)

exit_ocfs2_uptodate_cache();

+ dlmglue_exit_stack();
+
mlog_exit_void();
}

--
1.5.3.8

2008-03-06 00:34:53

by Joel Becker

[permalink] [raw]
Subject: [PATCH 15/18] ocfs2: Break out stackglue into modules.

We define the ocfs2_stack_plugin structure to represent a stack driver.
The o2cb stack code is split into stack_o2cb.c. This becomes the
ocfs2_stack_o2cb.ko module.

The stackglue generic functions are similarly split into the
ocfs2_stackglue.ko module. This module now provides an interface to
register drivers. The ocfs2_stack_o2cb driver registers itself. As
part of this interface, ocfs2_stackglue can load drivers on demand.
This is accomplished in ocfs2_cluster_connect().

ocfs2_cluster_disconnect() is now notified when a _hangup() is pending.
If a hangup is pending, it will not release the driver module and will
let _hangup() do that.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/Makefile | 7 +-
fs/ocfs2/dlmglue.c | 7 +-
fs/ocfs2/dlmglue.h | 2 +-
fs/ocfs2/stack_o2cb.c | 41 +++++++--
fs/ocfs2/stackglue.c | 238 ++++++++++++++++++++++++++++++++++++++++++++-----
fs/ocfs2/stackglue.h | 36 +++++++-
fs/ocfs2/super.c | 16 ++--
7 files changed, 297 insertions(+), 50 deletions(-)

diff --git a/fs/ocfs2/Makefile b/fs/ocfs2/Makefile
index 8e86195..b734254 100644
--- a/fs/ocfs2/Makefile
+++ b/fs/ocfs2/Makefile
@@ -2,7 +2,7 @@ EXTRA_CFLAGS += -Ifs/ocfs2

EXTRA_CFLAGS += -DCATCH_BH_JBD_RACES

-obj-$(CONFIG_OCFS2_FS) += ocfs2.o
+obj-$(CONFIG_OCFS2_FS) += ocfs2.o ocfs2_stackglue.o ocfs2_stack_o2cb.o

ocfs2-objs := \
alloc.o \
@@ -24,8 +24,6 @@ ocfs2-objs := \
namei.o \
resize.o \
slot_map.o \
- stackglue.o \
- stack_o2cb.o \
suballoc.o \
super.o \
symlink.o \
@@ -33,5 +31,8 @@ ocfs2-objs := \
uptodate.o \
ver.o

+ocfs2_stackglue-objs := stackglue.o
+ocfs2_stack_o2cb-objs := stack_o2cb.o
+
obj-$(CONFIG_OCFS2_FS) += cluster/
obj-$(CONFIG_OCFS2_FS) += dlm/
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 435d7e0..71af7d6 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -2641,7 +2641,7 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
mlog_errno(status);
mlog(ML_ERROR,
"could not find this host's node number\n");
- ocfs2_cluster_disconnect(conn);
+ ocfs2_cluster_disconnect(conn, 0);
goto bail;
}

@@ -2663,7 +2663,8 @@ bail:
return status;
}

-void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
+void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
+ int hangup_pending)
{
mlog_entry_void();

@@ -2683,7 +2684,7 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
ocfs2_lock_res_free(&osb->osb_super_lockres);
ocfs2_lock_res_free(&osb->osb_rename_lockres);

- ocfs2_cluster_disconnect(osb->cconn);
+ ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
osb->cconn = NULL;

ocfs2_dlm_shutdown_debug(osb);
diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h
index 34b7598..2bb01f0 100644
--- a/fs/ocfs2/dlmglue.h
+++ b/fs/ocfs2/dlmglue.h
@@ -58,7 +58,7 @@ struct ocfs2_meta_lvb {
#define OCFS2_LOCK_NONBLOCK (0x04)

int ocfs2_dlm_init(struct ocfs2_super *osb);
-void ocfs2_dlm_shutdown(struct ocfs2_super *osb);
+void ocfs2_dlm_shutdown(struct ocfs2_super *osb, int hangup_pending);
void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res);
void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
enum ocfs2_lock_type type,
diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c
index c9bc354..ac1d74c 100644
--- a/fs/ocfs2/stack_o2cb.c
+++ b/fs/ocfs2/stack_o2cb.c
@@ -18,7 +18,7 @@
*/

#include <linux/crc32.h>
-#include <linux/kmod.h>
+#include <linux/module.h>

/* Needed for AOP_TRUNCATED_PAGE in mlog_errno() */
#include <linux/fs.h>
@@ -33,6 +33,8 @@ struct o2dlm_private {
struct dlm_eviction_cb op_eviction_cb;
};

+static struct ocfs2_stack_plugin o2cb_stack;
+
/* These should be identical */
#if (DLM_LOCK_IV != LKM_IVMODE)
# error Lock modes do not match
@@ -158,23 +160,23 @@ static int dlm_status_to_errno(enum dlm_status status)

static void o2dlm_lock_ast_wrapper(void *astarg)
{
- BUG_ON(stack_glue_lproto == NULL);
+ BUG_ON(o2cb_stack.sp_proto == NULL);

- stack_glue_lproto->lp_lock_ast(astarg);
+ o2cb_stack.sp_proto->lp_lock_ast(astarg);
}

static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
{
- BUG_ON(stack_glue_lproto == NULL);
+ BUG_ON(o2cb_stack.sp_proto == NULL);

- stack_glue_lproto->lp_blocking_ast(astarg, level);
+ o2cb_stack.sp_proto->lp_blocking_ast(astarg, level);
}

static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
{
int error = dlm_status_to_errno(status);

- BUG_ON(stack_glue_lproto == NULL);
+ BUG_ON(o2cb_stack.sp_proto == NULL);

/*
* In o2dlm, you can get both the lock_ast() for the lock being
@@ -190,7 +192,7 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
if (status == DLM_CANCELGRANT)
return;

- stack_glue_lproto->lp_unlock_ast(astarg, error);
+ o2cb_stack.sp_proto->lp_unlock_ast(astarg, error);
}

static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
@@ -267,6 +269,7 @@ static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn)
struct dlm_protocol_version dlm_version;

BUG_ON(conn == NULL);
+ BUG_ON(o2cb_stack.sp_proto == NULL);

/* for now we only have one cluster/node, make sure we see it
* in the heartbeat universe */
@@ -314,7 +317,8 @@ out:
return rc;
}

-static int o2cb_cluster_disconnect(struct ocfs2_cluster_connection *conn)
+static int o2cb_cluster_disconnect(struct ocfs2_cluster_connection *conn,
+ int hangup_pending)
{
struct dlm_ctxt *dlm = conn->cc_lockspace;
struct o2dlm_private *priv = conn->cc_private;
@@ -393,3 +397,24 @@ struct ocfs2_stack_operations o2cb_stack_ops = {
.dump_lksb = o2cb_dump_lksb,
};

+static struct ocfs2_stack_plugin o2cb_stack = {
+ .sp_name = "o2cb",
+ .sp_ops = &o2cb_stack_ops,
+ .sp_owner = THIS_MODULE,
+};
+
+static int __init o2cb_stack_init(void)
+{
+ return ocfs2_stack_glue_register(&o2cb_stack);
+}
+
+static void __exit o2cb_stack_exit(void)
+{
+ ocfs2_stack_glue_unregister(&o2cb_stack);
+}
+
+MODULE_AUTHOR("Oracle");
+MODULE_DESCRIPTION("ocfs2 driver for the classic o2cb stack");
+MODULE_LICENSE("GPL");
+module_init(o2cb_stack_init);
+module_exit(o2cb_stack_exit);
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index e197367..1978c9c 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -18,17 +18,176 @@
* General Public License for more details.
*/

+#include <linux/list.h>
+#include <linux/spinlock.h>
+#include <linux/module.h>
#include <linux/slab.h>
#include <linux/kmod.h>

-/* Needed for AOP_TRUNCATED_PAGE in mlog_errno() */
-#include <linux/fs.h>
+#include "stackglue.h"

-#include "cluster/masklog.h"
+static struct ocfs2_locking_protocol *lproto;
+static DEFINE_SPINLOCK(ocfs2_stack_lock);
+static LIST_HEAD(ocfs2_stack_list);

-#include "stackglue.h"
+/*
+ * The stack currently in use. If not null, active_stack->sp_count > 0,
+ * the module is pinned, and the locking protocol cannot be changed.
+ */
+static struct ocfs2_stack_plugin *active_stack;
+
+static struct ocfs2_stack_plugin *ocfs2_stack_lookup(const char *name)
+{
+ struct ocfs2_stack_plugin *p;
+
+ assert_spin_locked(&ocfs2_stack_lock);
+
+ list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
+ if (!strcmp(p->sp_name, name))
+ return p;
+ }
+
+ return NULL;
+}
+
+static int ocfs2_stack_driver_request(const char *name)
+{
+ int rc;
+ struct ocfs2_stack_plugin *p;
+
+ spin_lock(&ocfs2_stack_lock);
+
+ if (active_stack) {
+ /*
+ * If the active stack isn't the one we want, it cannot
+ * be selected right now.
+ */
+ if (!strcmp(active_stack->sp_name, name))
+ rc = 0;
+ else
+ rc = -EBUSY;
+ goto out;
+ }
+
+ p = ocfs2_stack_lookup(name);
+ if (!p || !try_module_get(p->sp_owner)) {
+ rc = -ENOENT;
+ goto out;
+ }
+
+ /* Ok, the stack is pinned */
+ p->sp_count++;
+ active_stack = p;
+
+ rc = 0;
+
+out:
+ spin_unlock(&ocfs2_stack_lock);
+ return rc;
+}
+
+/*
+ * This function looks up the appropriate stack and makes it active. If
+ * there is no stack, it tries to load it. It will fail if the stack still
+ * cannot be found. It will also fail if a different stack is in use.
+ */
+static int ocfs2_stack_driver_get(const char *name)
+{
+ int rc;
+
+ rc = ocfs2_stack_driver_request(name);
+ if (rc == -ENOENT) {
+ request_module("ocfs2_stack_%s", name);
+ rc = ocfs2_stack_driver_request(name);
+ }
+
+ if (rc == -ENOENT) {
+ printk(KERN_ERR
+ "ocfs2: Cluster stack driver \"%s\" cannot be found\n",
+ name);
+ } else if (rc == -EBUSY) {
+ printk(KERN_ERR
+ "ocfs2: A different cluster stack driver is in use\n");
+ }
+
+ return rc;
+}

-struct ocfs2_locking_protocol *stack_glue_lproto;
+static void ocfs2_stack_driver_put(void)
+{
+ spin_lock(&ocfs2_stack_lock);
+ BUG_ON(active_stack == NULL);
+ BUG_ON(active_stack->sp_count == 0);
+
+ active_stack->sp_count--;
+ if (!active_stack->sp_count) {
+ module_put(active_stack->sp_owner);
+ active_stack = NULL;
+ }
+ spin_unlock(&ocfs2_stack_lock);
+}
+
+int ocfs2_stack_glue_register(struct ocfs2_stack_plugin *plugin)
+{
+ int rc;
+
+ spin_lock(&ocfs2_stack_lock);
+ if (!ocfs2_stack_lookup(plugin->sp_name)) {
+ plugin->sp_count = 0;
+ plugin->sp_proto = lproto;
+ list_add(&plugin->sp_list, &ocfs2_stack_list);
+ printk(KERN_INFO "ocfs2: Registered cluster interface %s\n",
+ plugin->sp_name);
+ rc = 0;
+ } else {
+ printk(KERN_ERR "ocfs2: Stack \"%s\" already registered\n",
+ plugin->sp_name);
+ rc = -EEXIST;
+ }
+ spin_unlock(&ocfs2_stack_lock);
+
+ return rc;
+}
+EXPORT_SYMBOL_GPL(ocfs2_stack_glue_register);
+
+void ocfs2_stack_glue_unregister(struct ocfs2_stack_plugin *plugin)
+{
+ struct ocfs2_stack_plugin *p;
+
+ spin_lock(&ocfs2_stack_lock);
+ p = ocfs2_stack_lookup(plugin->sp_name);
+ if (p) {
+ BUG_ON(p != plugin);
+ BUG_ON(plugin == active_stack);
+ BUG_ON(plugin->sp_count != 0);
+ list_del_init(&plugin->sp_list);
+ printk(KERN_INFO "ocfs2: Unregistered cluster interface %s\n",
+ plugin->sp_name);
+ } else {
+ printk(KERN_ERR "Stack \"%s\" is not registered\n",
+ plugin->sp_name);
+ }
+ spin_unlock(&ocfs2_stack_lock);
+}
+EXPORT_SYMBOL_GPL(ocfs2_stack_glue_unregister);
+
+void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto)
+{
+ struct ocfs2_stack_plugin *p;
+
+ BUG_ON(proto == NULL);
+
+ spin_lock(&ocfs2_stack_lock);
+ BUG_ON(active_stack != NULL);
+
+ lproto = proto;
+ list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
+ p->sp_proto = lproto;
+ }
+
+ spin_unlock(&ocfs2_stack_lock);
+}
+EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_locking_protocol);


int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
@@ -39,26 +198,29 @@ int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
unsigned int namelen,
void *astarg)
{
- BUG_ON(stack_glue_lproto == NULL);
+ BUG_ON(lproto == NULL);

- return o2cb_stack_ops.dlm_lock(conn, mode, lksb, flags,
- name, namelen, astarg);
+ return active_stack->sp_ops->dlm_lock(conn, mode, lksb, flags,
+ name, namelen, astarg);
}
+EXPORT_SYMBOL_GPL(ocfs2_dlm_lock);

int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *astarg)
{
- BUG_ON(stack_glue_lproto == NULL);
+ BUG_ON(lproto == NULL);

- return o2cb_stack_ops.dlm_unlock(conn, lksb, flags, astarg);
+ return active_stack->sp_ops->dlm_unlock(conn, lksb, flags, astarg);
}
+EXPORT_SYMBOL_GPL(ocfs2_dlm_unlock);

int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
{
- return o2cb_stack_ops.lock_status(lksb);
+ return active_stack->sp_ops->lock_status(lksb);
}
+EXPORT_SYMBOL_GPL(ocfs2_dlm_lock_status);

/*
* Why don't we cast to ocfs2_meta_lvb? The "clean" answer is that we
@@ -67,13 +229,15 @@ int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
*/
void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb)
{
- return o2cb_stack_ops.lock_lvb(lksb);
+ return active_stack->sp_ops->lock_lvb(lksb);
}
+EXPORT_SYMBOL_GPL(ocfs2_dlm_lvb);

void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
{
- o2cb_stack_ops.dump_lksb(lksb);
+ active_stack->sp_ops->dump_lksb(lksb);
}
+EXPORT_SYMBOL_GPL(ocfs2_dlm_dump_lksb);

int ocfs2_cluster_connect(const char *group,
int grouplen,
@@ -107,11 +271,16 @@ int ocfs2_cluster_connect(const char *group,
new_conn->cc_recovery_data = recovery_data;

/* Start the new connection at our maximum compatibility level */
- new_conn->cc_version = stack_glue_lproto->lp_max_version;
+ new_conn->cc_version = lproto->lp_max_version;
+
+ /* This will pin the stack driver if successful */
+ rc = ocfs2_stack_driver_get("o2cb");
+ if (rc)
+ goto out_free;

- rc = o2cb_stack_ops.connect(new_conn);
+ rc = active_stack->sp_ops->connect(new_conn);
if (rc) {
- mlog_errno(rc);
+ ocfs2_stack_driver_put();
goto out_free;
}

@@ -124,39 +293,60 @@ out_free:
out:
return rc;
}
+EXPORT_SYMBOL_GPL(ocfs2_cluster_connect);

-int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn)
+/* If hangup_pending is 0, the stack driver will be dropped */
+int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn,
+ int hangup_pending)
{
int ret;

BUG_ON(conn == NULL);

- ret = o2cb_stack_ops.disconnect(conn);
+ ret = active_stack->sp_ops->disconnect(conn, hangup_pending);

/* XXX Should we free it anyway? */
- if (!ret)
+ if (!ret) {
kfree(conn);
+ if (!hangup_pending)
+ ocfs2_stack_driver_put();
+ }

return ret;
}
+EXPORT_SYMBOL_GPL(ocfs2_cluster_disconnect);

void ocfs2_cluster_hangup(const char *group, int grouplen)
{
BUG_ON(group == NULL);
BUG_ON(group[grouplen] != '\0');

- o2cb_stack_ops.hangup(group, grouplen);
+ active_stack->sp_ops->hangup(group, grouplen);
+
+ /* cluster_disconnect() was called with hangup_pending==1 */
+ ocfs2_stack_driver_put();
}
+EXPORT_SYMBOL_GPL(ocfs2_cluster_hangup);

int ocfs2_cluster_this_node(unsigned int *node)
{
- return o2cb_stack_ops.this_node(node);
+ return active_stack->sp_ops->this_node(node);
}
+EXPORT_SYMBOL_GPL(ocfs2_cluster_this_node);

-void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto)
+
+static int __init ocfs2_stack_glue_init(void)
{
- BUG_ON(proto != NULL);
+ return 0;
+}

- stack_glue_lproto = proto;
+static void __exit ocfs2_stack_glue_exit(void)
+{
+ lproto = NULL;
}

+MODULE_AUTHOR("Oracle");
+MODULE_DESCRIPTION("ocfs2 cluter stack glue layer");
+MODULE_LICENSE("GPL");
+module_init(ocfs2_stack_glue_init);
+module_exit(ocfs2_stack_glue_exit);
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index 0836322..c96c8bb 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -119,14 +119,21 @@ struct ocfs2_stack_operations {
* Once ->disconnect() has returned, the connection structure will
* be freed. Thus, a stack must not return from ->disconnect()
* until it will no longer reference the conn pointer.
+ *
+ * If hangup_pending is zero, ocfs2_cluster_disconnect() will also
+ * be dropping the reference on the module.
*/
- int (*disconnect)(struct ocfs2_cluster_connection *conn);
+ int (*disconnect)(struct ocfs2_cluster_connection *conn,
+ int hangup_pending);

/*
* ocfs2_cluster_hangup() exists for compatibility with older
* ocfs2 tools. Only the classic stack really needs it. As such
* ->hangup() is not required of all stacks. See the comment by
* ocfs2_cluster_hangup() for more details.
+ *
+ * Note that ocfs2_cluster_hangup() can only be called if
+ * hangup_pending was passed to ocfs2_cluster_disconnect().
*/
void (*hangup)(const char *group, int grouplen);

@@ -184,13 +191,32 @@ struct ocfs2_stack_operations {
void (*dump_lksb)(union ocfs2_dlm_lksb *lksb);
};

+/*
+ * Each stack plugin must describe itself by registering a
+ * ocfs2_stack_plugin structure. This is only seen by stackglue and the
+ * stack driver.
+ */
+struct ocfs2_stack_plugin {
+ char *sp_name;
+ struct ocfs2_stack_operations *sp_ops;
+ struct module *sp_owner;
+
+ /* These are managed by the stackglue code. */
+ struct list_head sp_list;
+ unsigned int sp_count;
+ struct ocfs2_locking_protocol *sp_proto;
+};
+
+
+/* Used by the filesystem */
int ocfs2_cluster_connect(const char *group,
int grouplen,
void (*recovery_handler)(int node_num,
void *recovery_data),
void *recovery_data,
struct ocfs2_cluster_connection **conn);
-int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn);
+int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn,
+ int hangup_pending);
void ocfs2_cluster_hangup(const char *group, int grouplen);
int ocfs2_cluster_this_node(unsigned int *node);

@@ -212,6 +238,8 @@ void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb);

void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto);

-extern struct ocfs2_locking_protocol *stack_glue_lproto;
-extern struct ocfs2_stack_operations o2cb_stack_ops;
+
+/* Used by stack plugins */
+int ocfs2_stack_glue_register(struct ocfs2_stack_plugin *plugin);
+void ocfs2_stack_glue_unregister(struct ocfs2_stack_plugin *plugin);
#endif /* STACKGLUE_H */
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index b4a02a0..e27a0d4 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -1186,7 +1186,7 @@ leave:

static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
{
- int tmp;
+ int tmp, hangup_needed = 0;
struct ocfs2_super *osb = NULL;
char nodestr[8];

@@ -1225,19 +1225,21 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)

ocfs2_release_system_inodes(osb);

- if (osb->cconn)
- ocfs2_dlm_shutdown(osb);
-
- debugfs_remove(osb->osb_debug_root);
-
/*
- * This is a small hack to move ocfs2_hb_ctl into stackglue.
* If we're dismounting due to mount error, mount.ocfs2 will clean
* up heartbeat. If we're a local mount, there is no heartbeat.
* If we failed before we got a uuid_str yet, we can't stop
* heartbeat. Otherwise, do it.
*/
if (!mnt_err && !ocfs2_mount_local(osb) && osb->uuid_str)
+ hangup_needed = 1;
+
+ if (osb->cconn)
+ ocfs2_dlm_shutdown(osb, hangup_needed);
+
+ debugfs_remove(osb->osb_debug_root);
+
+ if (hangup_needed)
ocfs2_cluster_hangup(osb->uuid_str, strlen(osb->uuid_str));

atomic_set(&osb->vol_state, VOLUME_DISMOUNTED);
--
1.5.3.8

2008-03-06 00:35:52

by Joel Becker

[permalink] [raw]
Subject: [PATCH 06/18] ocfs2: Abstract out node number queries.

ocfs2 asks the cluster stack for the local node's node number for two
reasons; to fill the slot map and to print it. While the slot map isn't
necessary for userspace cluster stacks, the printing is very nice for
debugging. Thus we add ocfs2_cluster_this_node() as a generic API to get
this value. It is anticipated that the slot map will not be used under a
userspace cluster stack, so validity checks of the node num only need to
exist in the slot map code. Otherwise, it just gets used and printed as an
opaque value.

[ Fixed up some "int" versus "unsigned int" issues and made osb->node_num
truly opaque. --Mark ]

Signed-off-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/ocfs2.h | 2 +-
fs/ocfs2/slot_map.c | 2 --
fs/ocfs2/stackglue.c | 17 +++++++++++++++++
fs/ocfs2/stackglue.h | 1 +
fs/ocfs2/super.c | 22 +++++++++++-----------
5 files changed, 30 insertions(+), 14 deletions(-)

diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 664e4fe..7006aba 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -218,7 +218,7 @@ struct ocfs2_super
unsigned int s_atime_quantum;

unsigned int max_slots;
- s16 node_num;
+ unsigned int node_num;
int slot_num;
int preferred_slot;
int s_sectsize_bits;
diff --git a/fs/ocfs2/slot_map.c b/fs/ocfs2/slot_map.c
index 63fb1b2..bb5ff89 100644
--- a/fs/ocfs2/slot_map.c
+++ b/fs/ocfs2/slot_map.c
@@ -73,8 +73,6 @@ static void ocfs2_set_slot(struct ocfs2_slot_info *si,
int slot_num, unsigned int node_num)
{
BUG_ON((slot_num < 0) || (slot_num >= si->si_num_slots));
- BUG_ON((node_num == O2NM_INVALID_NODE_NUM) ||
- (node_num >= O2NM_MAX_NODES));

si->si_slots[slot_num].sl_valid = 1;
si->si_slots[slot_num].sl_node_num = node_num;
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index f6f309a..8146863 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -25,6 +25,8 @@
#include <linux/fs.h>

#include "cluster/masklog.h"
+#include "cluster/nodemanager.h"
+
#include "stackglue.h"

static struct ocfs2_locking_protocol *lproto;
@@ -371,6 +373,21 @@ int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn)
return 0;
}

+int ocfs2_cluster_this_node(unsigned int *node)
+{
+ int node_num;
+
+ node_num = o2nm_this_node();
+ if (node_num == O2NM_INVALID_NODE_NUM)
+ return -ENOENT;
+
+ if (node_num >= O2NM_MAX_NODES)
+ return -EOVERFLOW;
+
+ *node = node_num;
+ return 0;
+}
+
void o2cb_get_stack(struct ocfs2_locking_protocol *proto)
{
BUG_ON(proto == NULL);
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index 3900b5c..ccb0399 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -74,6 +74,7 @@ int ocfs2_cluster_connect(const char *group,
void *recovery_data,
struct ocfs2_cluster_connection **conn);
int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn);
+int ocfs2_cluster_this_node(unsigned int *node);

int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index 0ee4975..d3c4d32 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -694,7 +694,7 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
if (ocfs2_mount_local(osb))
snprintf(nodestr, sizeof(nodestr), "local");
else
- snprintf(nodestr, sizeof(nodestr), "%d", osb->node_num);
+ snprintf(nodestr, sizeof(nodestr), "%u", osb->node_num);

printk(KERN_INFO "ocfs2: Mounting device (%s) on (node %s, slot %d) "
"with %s data mode.\n",
@@ -1145,16 +1145,17 @@ static int ocfs2_fill_local_node_info(struct ocfs2_super *osb)
* desirable. */
if (ocfs2_mount_local(osb))
osb->node_num = 0;
- else
- osb->node_num = o2nm_this_node();
-
- if (osb->node_num == O2NM_MAX_NODES) {
- mlog(ML_ERROR, "could not find this host's node number\n");
- status = -ENOENT;
- goto bail;
+ else {
+ status = ocfs2_cluster_this_node(&osb->node_num);
+ if (status < 0) {
+ mlog_errno(status);
+ mlog(ML_ERROR,
+ "could not find this host's node number\n");
+ goto bail;
+ }
}

- mlog(0, "I am node %d\n", osb->node_num);
+ mlog(0, "I am node %u\n", osb->node_num);

status = 0;
bail:
@@ -1282,7 +1283,7 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
if (ocfs2_mount_local(osb))
snprintf(nodestr, sizeof(nodestr), "local");
else
- snprintf(nodestr, sizeof(nodestr), "%d", osb->node_num);
+ snprintf(nodestr, sizeof(nodestr), "%u", osb->node_num);

printk(KERN_INFO "ocfs2: Unmounting device (%s) on (node %s)\n",
osb->dev_str, nodestr);
@@ -1384,7 +1385,6 @@ static int ocfs2_initialize_super(struct super_block *sb,

osb->s_atime_quantum = OCFS2_DEFAULT_ATIME_QUANTUM;

- osb->node_num = O2NM_INVALID_NODE_NUM;
osb->slot_num = OCFS2_INVALID_SLOT;

osb->local_alloc_state = OCFS2_LA_UNUSED;
--
1.5.3.8

2008-03-06 00:36:21

by Joel Becker

[permalink] [raw]
Subject: [PATCH 02/18] ocfs2: Use global DLM_ constants in generic code.

The ocfs2 generic code should use the values in <linux/dlmconstants.h>.
stackglue.c will convert them to o2dlm values.

Signed-off-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/dlmglue.c | 140 +++++++++++++++++++++++++-------------------------
fs/ocfs2/stackglue.c | 71 +++++++++++++++++++++++---
fs/ocfs2/stackglue.h | 13 +++++
3 files changed, 147 insertions(+), 77 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 1320410..5806d53 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -37,8 +37,6 @@
#include <cluster/nodemanager.h>
#include <cluster/tcp.h>

-#include <dlm/dlmapi.h>
-
#define MLOG_MASK_PREFIX ML_DLM_GLUE
#include <cluster/masklog.h>

@@ -317,7 +315,7 @@ static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *l
static int ocfs2_lock_create(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int level,
- int dlm_flags);
+ u32 dlm_flags);
static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
int wanted);
static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
@@ -407,9 +405,9 @@ static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
res->l_ops = ops;
res->l_priv = priv;

- res->l_level = LKM_IVMODE;
- res->l_requested = LKM_IVMODE;
- res->l_blocking = LKM_IVMODE;
+ res->l_level = DLM_LOCK_IV;
+ res->l_requested = DLM_LOCK_IV;
+ res->l_blocking = DLM_LOCK_IV;
res->l_action = OCFS2_AST_INVALID;
res->l_unlock_action = OCFS2_UNLOCK_INVALID;

@@ -605,10 +603,10 @@ static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
BUG_ON(!lockres);

switch(level) {
- case LKM_EXMODE:
+ case DLM_LOCK_EX:
lockres->l_ex_holders++;
break;
- case LKM_PRMODE:
+ case DLM_LOCK_PR:
lockres->l_ro_holders++;
break;
default:
@@ -626,11 +624,11 @@ static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
BUG_ON(!lockres);

switch(level) {
- case LKM_EXMODE:
+ case DLM_LOCK_EX:
BUG_ON(!lockres->l_ex_holders);
lockres->l_ex_holders--;
break;
- case LKM_PRMODE:
+ case DLM_LOCK_PR:
BUG_ON(!lockres->l_ro_holders);
lockres->l_ro_holders--;
break;
@@ -645,12 +643,12 @@ static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
* lock types are added. */
static inline int ocfs2_highest_compat_lock_level(int level)
{
- int new_level = LKM_EXMODE;
+ int new_level = DLM_LOCK_EX;

- if (level == LKM_EXMODE)
- new_level = LKM_NLMODE;
- else if (level == LKM_PRMODE)
- new_level = LKM_PRMODE;
+ if (level == DLM_LOCK_EX)
+ new_level = DLM_LOCK_NL;
+ else if (level == DLM_LOCK_PR)
+ new_level = DLM_LOCK_PR;
return new_level;
}

@@ -689,12 +687,12 @@ static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
- BUG_ON(lockres->l_blocking <= LKM_NLMODE);
+ BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);

lockres->l_level = lockres->l_requested;
if (lockres->l_level <=
ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
- lockres->l_blocking = LKM_NLMODE;
+ lockres->l_blocking = DLM_LOCK_NL;
lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
}
lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
@@ -713,7 +711,7 @@ static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lo
* information is already up to data. Convert from NL to
* *anything* however should mark ourselves as needing an
* update */
- if (lockres->l_level == LKM_NLMODE &&
+ if (lockres->l_level == DLM_LOCK_NL &&
lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);

@@ -730,7 +728,7 @@ static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *loc
BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);

- if (lockres->l_requested > LKM_NLMODE &&
+ if (lockres->l_requested > DLM_LOCK_NL &&
!(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
@@ -775,7 +773,7 @@ static void ocfs2_blocking_ast(void *opaque, int level)
int needs_downconvert;
unsigned long flags;

- BUG_ON(level <= LKM_NLMODE);
+ BUG_ON(level <= DLM_LOCK_NL);

mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
lockres->l_name, level, lockres->l_level,
@@ -866,7 +864,7 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
static int ocfs2_lock_create(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int level,
- int dlm_flags)
+ u32 dlm_flags)
{
int ret = 0;
enum dlm_status status = DLM_NORMAL;
@@ -874,7 +872,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,

mlog_entry_void();

- mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
+ mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
dlm_flags);

spin_lock_irqsave(&lockres->l_lock, flags);
@@ -1016,7 +1014,7 @@ static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
static int ocfs2_cluster_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int level,
- int lkm_flags,
+ u32 lkm_flags,
int arg_flags)
{
struct ocfs2_mask_waiter mw;
@@ -1030,7 +1028,7 @@ static int ocfs2_cluster_lock(struct ocfs2_super *osb,
ocfs2_init_mask_waiter(&mw);

if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
- lkm_flags |= LKM_VALBLK;
+ lkm_flags |= DLM_LKF_VALBLK;

again:
wait = 0;
@@ -1074,18 +1072,18 @@ again:

if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
lockres->l_action = OCFS2_AST_ATTACH;
- lkm_flags &= ~LKM_CONVERT;
+ lkm_flags &= ~DLM_LKF_CONVERT;
} else {
lockres->l_action = OCFS2_AST_CONVERT;
- lkm_flags |= LKM_CONVERT;
+ lkm_flags |= DLM_LKF_CONVERT;
}

lockres->l_requested = level;
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
spin_unlock_irqrestore(&lockres->l_lock, flags);

- BUG_ON(level == LKM_IVMODE);
- BUG_ON(level == LKM_NLMODE);
+ BUG_ON(level == DLM_LOCK_IV);
+ BUG_ON(level == DLM_LOCK_NL);

mlog(0, "lock %s, convert from %d to level = %d\n",
lockres->l_name, lockres->l_level, level);
@@ -1099,7 +1097,7 @@ again:
OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
if (status != DLM_NORMAL) {
- if ((lkm_flags & LKM_NOQUEUE) &&
+ if ((lkm_flags & DLM_LKF_NOQUEUE) &&
(status == DLM_NOTQUEUED))
ret = -EAGAIN;
else {
@@ -1175,9 +1173,9 @@ static int ocfs2_create_new_lock(struct ocfs2_super *osb,
int ex,
int local)
{
- int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
unsigned long flags;
- int lkm_flags = local ? LKM_LOCAL : 0;
+ u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;

spin_lock_irqsave(&lockres->l_lock, flags);
BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
@@ -1220,7 +1218,7 @@ int ocfs2_create_new_inode_locks(struct inode *inode)
}

/*
- * We don't want to use LKM_LOCAL on a meta data lock as they
+ * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
* don't use a generation in their lock names.
*/
ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
@@ -1259,7 +1257,7 @@ int ocfs2_rw_lock(struct inode *inode, int write)

lockres = &OCFS2_I(inode)->ip_rw_lockres;

- level = write ? LKM_EXMODE : LKM_PRMODE;
+ level = write ? DLM_LOCK_EX : DLM_LOCK_PR;

status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
0);
@@ -1272,7 +1270,7 @@ int ocfs2_rw_lock(struct inode *inode, int write)

void ocfs2_rw_unlock(struct inode *inode, int write)
{
- int level = write ? LKM_EXMODE : LKM_PRMODE;
+ int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);

@@ -1310,7 +1308,7 @@ int ocfs2_open_lock(struct inode *inode)
lockres = &OCFS2_I(inode)->ip_open_lockres;

status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
- LKM_PRMODE, 0, 0);
+ DLM_LOCK_PR, 0, 0);
if (status < 0)
mlog_errno(status);

@@ -1338,16 +1336,16 @@ int ocfs2_try_open_lock(struct inode *inode, int write)

lockres = &OCFS2_I(inode)->ip_open_lockres;

- level = write ? LKM_EXMODE : LKM_PRMODE;
+ level = write ? DLM_LOCK_EX : DLM_LOCK_PR;

/*
* The file system may already holding a PRMODE/EXMODE open lock.
- * Since we pass LKM_NOQUEUE, the request won't block waiting on
+ * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
* other nodes and the -EAGAIN will indicate to the caller that
* this inode is still in use.
*/
status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
- level, LKM_NOQUEUE, 0);
+ level, DLM_LKF_NOQUEUE, 0);

out:
mlog_exit(status);
@@ -1372,10 +1370,10 @@ void ocfs2_open_unlock(struct inode *inode)

if(lockres->l_ro_holders)
ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
- LKM_PRMODE);
+ DLM_LOCK_PR);
if(lockres->l_ex_holders)
ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
- LKM_EXMODE);
+ DLM_LOCK_EX);

out:
mlog_exit_void();
@@ -1462,7 +1460,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
ocfs2_init_mask_waiter(&mw);

if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
- (lockres->l_level > LKM_NLMODE)) {
+ (lockres->l_level > DLM_LOCK_NL)) {
mlog(ML_ERROR,
"File lock \"%s\" has busy or locked state: flags: 0x%lx, "
"level: %u\n", lockres->l_name, lockres->l_flags,
@@ -1570,7 +1568,7 @@ void ocfs2_file_unlock(struct file *file)
* Fake a blocking ast for the downconvert code.
*/
lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
- lockres->l_blocking = LKM_EXMODE;
+ lockres->l_blocking = DLM_LOCK_EX;

ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
@@ -1599,11 +1597,11 @@ static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
* condition. */
if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
switch(lockres->l_blocking) {
- case LKM_EXMODE:
+ case DLM_LOCK_EX:
if (!lockres->l_ex_holders && !lockres->l_ro_holders)
kick = 1;
break;
- case LKM_PRMODE:
+ case DLM_LOCK_PR:
if (!lockres->l_ex_holders)
kick = 1;
break;
@@ -1921,7 +1919,8 @@ int ocfs2_inode_lock_full(struct inode *inode,
int ex,
int arg_flags)
{
- int status, level, dlm_flags, acquired;
+ int status, level, acquired;
+ u32 dlm_flags;
struct ocfs2_lock_res *lockres = NULL;
struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
struct buffer_head *local_bh = NULL;
@@ -1951,10 +1950,10 @@ int ocfs2_inode_lock_full(struct inode *inode,
ocfs2_wait_for_recovery(osb);

lockres = &OCFS2_I(inode)->ip_inode_lockres;
- level = ex ? LKM_EXMODE : LKM_PRMODE;
+ level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
dlm_flags = 0;
if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
- dlm_flags |= LKM_NOQUEUE;
+ dlm_flags |= DLM_LKF_NOQUEUE;

status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
if (status < 0) {
@@ -2105,7 +2104,7 @@ int ocfs2_inode_lock_atime(struct inode *inode,
void ocfs2_inode_unlock(struct inode *inode,
int ex)
{
- int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);

@@ -2126,7 +2125,7 @@ int ocfs2_super_lock(struct ocfs2_super *osb,
int ex)
{
int status = 0;
- int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;

mlog_entry_void();
@@ -2168,7 +2167,7 @@ bail:
void ocfs2_super_unlock(struct ocfs2_super *osb,
int ex)
{
- int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;

if (!ocfs2_mount_local(osb))
@@ -2186,7 +2185,7 @@ int ocfs2_rename_lock(struct ocfs2_super *osb)
if (ocfs2_mount_local(osb))
return 0;

- status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
+ status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
if (status < 0)
mlog_errno(status);

@@ -2198,13 +2197,13 @@ void ocfs2_rename_unlock(struct ocfs2_super *osb)
struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;

if (!ocfs2_mount_local(osb))
- ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
+ ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
}

int ocfs2_dentry_lock(struct dentry *dentry, int ex)
{
int ret;
- int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);

@@ -2225,7 +2224,7 @@ int ocfs2_dentry_lock(struct dentry *dentry, int ex)

void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
{
- int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);

@@ -2614,7 +2613,7 @@ static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
lockres->l_action = OCFS2_AST_INVALID;
break;
case OCFS2_UNLOCK_DROP_LOCK:
- lockres->l_level = LKM_IVMODE;
+ lockres->l_level = DLM_LOCK_IV;
break;
default:
BUG();
@@ -2635,14 +2634,14 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
{
enum dlm_status status;
unsigned long flags;
- int lkm_flags = 0;
+ u32 lkm_flags = 0;

/* We didn't get anywhere near actually using this lockres. */
if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
goto out;

if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
- lkm_flags |= LKM_VALBLK;
+ lkm_flags |= DLM_LKF_VALBLK;

spin_lock_irqsave(&lockres->l_lock, flags);

@@ -2668,7 +2667,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,

if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
- lockres->l_level == LKM_EXMODE &&
+ lockres->l_level == DLM_LOCK_EX &&
!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
lockres->l_ops->set_lvb(lockres);
}
@@ -2801,10 +2800,10 @@ static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
{
assert_spin_locked(&lockres->l_lock);

- BUG_ON(lockres->l_blocking <= LKM_NLMODE);
+ BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);

if (lockres->l_level <= new_level) {
- mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
+ mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n",
lockres->l_level, new_level);
BUG();
}
@@ -2822,13 +2821,14 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
int new_level,
int lvb)
{
- int ret, dlm_flags = LKM_CONVERT;
+ int ret;
+ u32 dlm_flags = DLM_LKF_CONVERT;
enum dlm_status status;

mlog_entry_void();

if (lvb)
- dlm_flags |= LKM_VALBLK;
+ dlm_flags |= DLM_LKF_VALBLK;

status = ocfs2_dlm_lock(osb->dlm,
new_level,
@@ -2894,7 +2894,7 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
ret = 0;
status = ocfs2_dlm_unlock(osb->dlm,
&lockres->l_lksb,
- LKM_CANCEL,
+ DLM_LKF_CANCEL,
lockres);
if (status != DLM_NORMAL) {
ocfs2_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
@@ -2939,13 +2939,13 @@ recheck:

/* if we're blocking an exclusive and we have *any* holders,
* then requeue. */
- if ((lockres->l_blocking == LKM_EXMODE)
+ if ((lockres->l_blocking == DLM_LOCK_EX)
&& (lockres->l_ex_holders || lockres->l_ro_holders))
goto leave_requeue;

/* If it's a PR we're blocking, then only
* requeue if we've got any EX holders */
- if (lockres->l_blocking == LKM_PRMODE &&
+ if (lockres->l_blocking == DLM_LOCK_PR &&
lockres->l_ex_holders)
goto leave_requeue;

@@ -2992,7 +2992,7 @@ downconvert:
ctl->requeue = 0;

if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
- if (lockres->l_level == LKM_EXMODE)
+ if (lockres->l_level == DLM_LOCK_EX)
set_lvb = 1;

/*
@@ -3046,7 +3046,7 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
(unsigned long long)OCFS2_I(inode)->ip_blkno);
}
sync_mapping_buffers(mapping);
- if (blocking == LKM_EXMODE) {
+ if (blocking == DLM_LOCK_EX) {
truncate_inode_pages(mapping, 0);
} else {
/* We only need to wait on the I/O if we're not also
@@ -3067,8 +3067,8 @@ static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
struct inode *inode = ocfs2_lock_res_inode(lockres);
int checkpointed = ocfs2_inode_fully_checkpointed(inode);

- BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
- BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
+ BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
+ BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);

if (checkpointed)
return 1;
@@ -3132,7 +3132,7 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
* valid. The downconvert code will retain a PR for this node,
* so there's no further work to do.
*/
- if (blocking == LKM_PRMODE)
+ if (blocking == DLM_LOCK_PR)
return UNBLOCK_CONTINUE;

/*
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 4f44f23..9953804 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -18,15 +18,65 @@
* General Public License for more details.
*/

-#include <linux/types.h>
-#include <linux/list.h>
-
-#include "dlm/dlmapi.h"
-
#include "stackglue.h"

static struct ocfs2_locking_protocol *lproto;

+/* These should be identical */
+#if (DLM_LOCK_IV != LKM_IVMODE)
+# error Lock modes do not match
+#endif
+#if (DLM_LOCK_NL != LKM_NLMODE)
+# error Lock modes do not match
+#endif
+#if (DLM_LOCK_CR != LKM_CRMODE)
+# error Lock modes do not match
+#endif
+#if (DLM_LOCK_CW != LKM_CWMODE)
+# error Lock modes do not match
+#endif
+#if (DLM_LOCK_PR != LKM_PRMODE)
+# error Lock modes do not match
+#endif
+#if (DLM_LOCK_PW != LKM_PWMODE)
+# error Lock modes do not match
+#endif
+#if (DLM_LOCK_EX != LKM_EXMODE)
+# error Lock modes do not match
+#endif
+static inline int mode_to_o2dlm(int mode)
+{
+ BUG_ON(mode > LKM_MAXMODE);
+
+ return mode;
+}
+
+#define map_flag(_generic, _o2dlm) \
+ if (flags & (_generic)) { \
+ flags &= ~(_generic); \
+ o2dlm_flags |= (_o2dlm); \
+ }
+static int flags_to_o2dlm(u32 flags)
+{
+ int o2dlm_flags = 0;
+
+ map_flag(DLM_LKF_NOQUEUE, LKM_NOQUEUE);
+ map_flag(DLM_LKF_CANCEL, LKM_CANCEL);
+ map_flag(DLM_LKF_CONVERT, LKM_CONVERT);
+ map_flag(DLM_LKF_VALBLK, LKM_VALBLK);
+ map_flag(DLM_LKF_IVVALBLK, LKM_INVVALBLK);
+ map_flag(DLM_LKF_ORPHAN, LKM_ORPHAN);
+ map_flag(DLM_LKF_FORCEUNLOCK, LKM_FORCE);
+ map_flag(DLM_LKF_TIMEOUT, LKM_TIMEOUT);
+ map_flag(DLM_LKF_LOCAL, LKM_LOCAL);
+
+ /* map_flag() should have cleared every flag passed in */
+ BUG_ON(flags != 0);
+
+ return o2dlm_flags;
+}
+#undef map_flag
+
enum dlm_status ocfs2_dlm_lock(struct dlm_ctxt *dlm,
int mode,
struct dlm_lockstatus *lksb,
@@ -35,8 +85,12 @@ enum dlm_status ocfs2_dlm_lock(struct dlm_ctxt *dlm,
unsigned int namelen,
void *astarg)
{
+ int o2dlm_mode = mode_to_o2dlm(mode);
+ int o2dlm_flags = flags_to_o2dlm(flags);
+
BUG_ON(lproto == NULL);
- return dlmlock(dlm, mode, lksb, flags, name, namelen,
+
+ return dlmlock(dlm, o2dlm_mode, lksb, o2dlm_flags, name, namelen,
lproto->lp_lock_ast, astarg,
lproto->lp_blocking_ast);
}
@@ -46,9 +100,12 @@ enum dlm_status ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
u32 flags,
void *astarg)
{
+ int o2dlm_flags = flags_to_o2dlm(flags);
+
BUG_ON(lproto == NULL);

- return dlmunlock(dlm, lksb, flags, lproto->lp_unlock_ast, astarg);
+ return dlmunlock(dlm, lksb, o2dlm_flags,
+ lproto->lp_unlock_ast, astarg);
}


diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index 40a0024..986d059 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -21,6 +21,19 @@
#ifndef STACKGLUE_H
#define STACKGLUE_H

+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/dlmconstants.h>
+
+/*
+ * dlmconstants.h does not have a LOCAL flag. We hope to remove it
+ * some day, but right now we need it. Let's fake it. This value is larger
+ * than any flag in dlmconstants.h.
+ */
+#define DLM_LKF_LOCAL 0x00100000
+
+#include "dlm/dlmapi.h"
+
struct ocfs2_locking_protocol {
void (*lp_lock_ast)(void *astarg);
void (*lp_blocking_ast)(void *astarg, int level);
--
1.5.3.8

2008-03-06 00:36:51

by Joel Becker

[permalink] [raw]
Subject: [PATCH 08/18] ocfs2: Fill node number during cluster stack init

From: Mark Fasheh <[email protected]>

It doesn't make sense to query for a node number before connecting to the
cluster stack. This should be safe to do because node_num is only just
printed,
and we're actually only moving the setting of node num a small amount
further in the mount process.

[ Disconnect when node query fails -- Joel ]

Reviewed-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/dlmglue.c | 13 ++++++++++++-
fs/ocfs2/super.c | 33 ---------------------------------
2 files changed, 12 insertions(+), 34 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index aa13d15..9876857 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -2459,8 +2459,10 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)

mlog_entry_void();

- if (ocfs2_mount_local(osb))
+ if (ocfs2_mount_local(osb)) {
+ osb->node_num = 0;
goto local;
+ }

status = ocfs2_dlm_init_debug(osb);
if (status < 0) {
@@ -2487,6 +2489,15 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
goto bail;
}

+ status = ocfs2_cluster_this_node(&osb->node_num);
+ if (status < 0) {
+ mlog_errno(status);
+ mlog(ML_ERROR,
+ "could not find this host's node number\n");
+ ocfs2_cluster_disconnect(conn);
+ goto bail;
+ }
+
local:
ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index 8f536b3..fa9c46e 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -108,7 +108,6 @@ static int ocfs2_sync_fs(struct super_block *sb, int wait);
static int ocfs2_init_global_system_inodes(struct ocfs2_super *osb);
static int ocfs2_init_local_system_inodes(struct ocfs2_super *osb);
static void ocfs2_release_system_inodes(struct ocfs2_super *osb);
-static int ocfs2_fill_local_node_info(struct ocfs2_super *osb);
static int ocfs2_check_volume(struct ocfs2_super *osb);
static int ocfs2_verify_volume(struct ocfs2_dinode *di,
struct buffer_head *bh,
@@ -1126,32 +1125,6 @@ static int ocfs2_get_sector(struct super_block *sb,
return 0;
}

-/* ocfs2 1.0 only allows one cluster and node identity per kernel image. */
-static int ocfs2_fill_local_node_info(struct ocfs2_super *osb)
-{
- int status;
-
- /* XXX hold a ref on the node while mounte? easy enough, if
- * desirable. */
- if (ocfs2_mount_local(osb))
- osb->node_num = 0;
- else {
- status = ocfs2_cluster_this_node(&osb->node_num);
- if (status < 0) {
- mlog_errno(status);
- mlog(ML_ERROR,
- "could not find this host's node number\n");
- goto bail;
- }
- }
-
- mlog(0, "I am node %u\n", osb->node_num);
-
- status = 0;
-bail:
- return status;
-}
-
static int ocfs2_mount_volume(struct super_block *sb)
{
int status = 0;
@@ -1163,12 +1136,6 @@ static int ocfs2_mount_volume(struct super_block *sb)
if (ocfs2_is_hard_readonly(osb))
goto leave;

- status = ocfs2_fill_local_node_info(osb);
- if (status < 0) {
- mlog_errno(status);
- goto leave;
- }
-
status = ocfs2_dlm_init(osb);
if (status < 0) {
mlog_errno(status);
--
1.5.3.8

2008-03-06 00:37:45

by Joel Becker

[permalink] [raw]
Subject: [PATCH 14/18] ocfs2: Create ocfs2_stack_operations and split out the o2cb stack.

Define the ocfs2_stack_operations structure. Build o2cb_stack_ops from
all of the o2cb-specific stack functions. Change the generic stack glue
functions to call the stack_ops instead of the o2cb functions directly.

The o2cb functions are moved to stack_o2cb.c. The headers are cleaned up
to where only needed headers are included.

In this code, stackglue.c and stack_o2cb.c refer to some shared
extern variables. When they become modules, that will change.

Signed-off-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/Makefile | 1 +
fs/ocfs2/stack_o2cb.c | 395 +++++++++++++++++++++++++++++++++++++++++++++++++
fs/ocfs2/stackglue.c | 385 ++---------------------------------------------
fs/ocfs2/stackglue.h | 123 +++++++++++++++-
4 files changed, 532 insertions(+), 372 deletions(-)
create mode 100644 fs/ocfs2/stack_o2cb.c

diff --git a/fs/ocfs2/Makefile b/fs/ocfs2/Makefile
index 3ba64af..8e86195 100644
--- a/fs/ocfs2/Makefile
+++ b/fs/ocfs2/Makefile
@@ -25,6 +25,7 @@ ocfs2-objs := \
resize.o \
slot_map.o \
stackglue.o \
+ stack_o2cb.o \
suballoc.o \
super.o \
symlink.o \
diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c
new file mode 100644
index 0000000..c9bc354
--- /dev/null
+++ b/fs/ocfs2/stack_o2cb.c
@@ -0,0 +1,395 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * stack_o2cb.c
+ *
+ * Code which interfaces ocfs2 with the o2cb stack.
+ *
+ * Copyright (C) 2007 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation, version 2.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ */
+
+#include <linux/crc32.h>
+#include <linux/kmod.h>
+
+/* Needed for AOP_TRUNCATED_PAGE in mlog_errno() */
+#include <linux/fs.h>
+
+#include "cluster/masklog.h"
+#include "cluster/nodemanager.h"
+#include "cluster/heartbeat.h"
+
+#include "stackglue.h"
+
+struct o2dlm_private {
+ struct dlm_eviction_cb op_eviction_cb;
+};
+
+/* These should be identical */
+#if (DLM_LOCK_IV != LKM_IVMODE)
+# error Lock modes do not match
+#endif
+#if (DLM_LOCK_NL != LKM_NLMODE)
+# error Lock modes do not match
+#endif
+#if (DLM_LOCK_CR != LKM_CRMODE)
+# error Lock modes do not match
+#endif
+#if (DLM_LOCK_CW != LKM_CWMODE)
+# error Lock modes do not match
+#endif
+#if (DLM_LOCK_PR != LKM_PRMODE)
+# error Lock modes do not match
+#endif
+#if (DLM_LOCK_PW != LKM_PWMODE)
+# error Lock modes do not match
+#endif
+#if (DLM_LOCK_EX != LKM_EXMODE)
+# error Lock modes do not match
+#endif
+static inline int mode_to_o2dlm(int mode)
+{
+ BUG_ON(mode > LKM_MAXMODE);
+
+ return mode;
+}
+
+#define map_flag(_generic, _o2dlm) \
+ if (flags & (_generic)) { \
+ flags &= ~(_generic); \
+ o2dlm_flags |= (_o2dlm); \
+ }
+static int flags_to_o2dlm(u32 flags)
+{
+ int o2dlm_flags = 0;
+
+ map_flag(DLM_LKF_NOQUEUE, LKM_NOQUEUE);
+ map_flag(DLM_LKF_CANCEL, LKM_CANCEL);
+ map_flag(DLM_LKF_CONVERT, LKM_CONVERT);
+ map_flag(DLM_LKF_VALBLK, LKM_VALBLK);
+ map_flag(DLM_LKF_IVVALBLK, LKM_INVVALBLK);
+ map_flag(DLM_LKF_ORPHAN, LKM_ORPHAN);
+ map_flag(DLM_LKF_FORCEUNLOCK, LKM_FORCE);
+ map_flag(DLM_LKF_TIMEOUT, LKM_TIMEOUT);
+ map_flag(DLM_LKF_LOCAL, LKM_LOCAL);
+
+ /* map_flag() should have cleared every flag passed in */
+ BUG_ON(flags != 0);
+
+ return o2dlm_flags;
+}
+#undef map_flag
+
+/*
+ * Map an o2dlm status to standard errno values.
+ *
+ * o2dlm only uses a handful of these, and returns even fewer to the
+ * caller. Still, we try to assign sane values to each error.
+ *
+ * The following value pairs have special meanings to dlmglue, thus
+ * the right hand side needs to stay unique - never duplicate the
+ * mapping elsewhere in the table!
+ *
+ * DLM_NORMAL: 0
+ * DLM_NOTQUEUED: -EAGAIN
+ * DLM_CANCELGRANT: -EBUSY
+ * DLM_CANCEL: -DLM_ECANCEL
+ */
+/* Keep in sync with dlmapi.h */
+static int status_map[] = {
+ [DLM_NORMAL] = 0, /* Success */
+ [DLM_GRANTED] = -EINVAL,
+ [DLM_DENIED] = -EACCES,
+ [DLM_DENIED_NOLOCKS] = -EACCES,
+ [DLM_WORKING] = -EACCES,
+ [DLM_BLOCKED] = -EINVAL,
+ [DLM_BLOCKED_ORPHAN] = -EINVAL,
+ [DLM_DENIED_GRACE_PERIOD] = -EACCES,
+ [DLM_SYSERR] = -ENOMEM, /* It is what it is */
+ [DLM_NOSUPPORT] = -EPROTO,
+ [DLM_CANCELGRANT] = -EBUSY, /* Cancel after grant */
+ [DLM_IVLOCKID] = -EINVAL,
+ [DLM_SYNC] = -EINVAL,
+ [DLM_BADTYPE] = -EINVAL,
+ [DLM_BADRESOURCE] = -EINVAL,
+ [DLM_MAXHANDLES] = -ENOMEM,
+ [DLM_NOCLINFO] = -EINVAL,
+ [DLM_NOLOCKMGR] = -EINVAL,
+ [DLM_NOPURGED] = -EINVAL,
+ [DLM_BADARGS] = -EINVAL,
+ [DLM_VOID] = -EINVAL,
+ [DLM_NOTQUEUED] = -EAGAIN, /* Trylock failed */
+ [DLM_IVBUFLEN] = -EINVAL,
+ [DLM_CVTUNGRANT] = -EPERM,
+ [DLM_BADPARAM] = -EINVAL,
+ [DLM_VALNOTVALID] = -EINVAL,
+ [DLM_REJECTED] = -EPERM,
+ [DLM_ABORT] = -EINVAL,
+ [DLM_CANCEL] = -DLM_ECANCEL, /* Successful cancel */
+ [DLM_IVRESHANDLE] = -EINVAL,
+ [DLM_DEADLOCK] = -EDEADLK,
+ [DLM_DENIED_NOASTS] = -EINVAL,
+ [DLM_FORWARD] = -EINVAL,
+ [DLM_TIMEOUT] = -ETIMEDOUT,
+ [DLM_IVGROUPID] = -EINVAL,
+ [DLM_VERS_CONFLICT] = -EOPNOTSUPP,
+ [DLM_BAD_DEVICE_PATH] = -ENOENT,
+ [DLM_NO_DEVICE_PERMISSION] = -EPERM,
+ [DLM_NO_CONTROL_DEVICE] = -ENOENT,
+ [DLM_RECOVERING] = -ENOTCONN,
+ [DLM_MIGRATING] = -ERESTART,
+ [DLM_MAXSTATS] = -EINVAL,
+};
+
+static int dlm_status_to_errno(enum dlm_status status)
+{
+ BUG_ON(status > (sizeof(status_map) / sizeof(status_map[0])));
+
+ return status_map[status];
+}
+
+static void o2dlm_lock_ast_wrapper(void *astarg)
+{
+ BUG_ON(stack_glue_lproto == NULL);
+
+ stack_glue_lproto->lp_lock_ast(astarg);
+}
+
+static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
+{
+ BUG_ON(stack_glue_lproto == NULL);
+
+ stack_glue_lproto->lp_blocking_ast(astarg, level);
+}
+
+static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
+{
+ int error = dlm_status_to_errno(status);
+
+ BUG_ON(stack_glue_lproto == NULL);
+
+ /*
+ * In o2dlm, you can get both the lock_ast() for the lock being
+ * granted and the unlock_ast() for the CANCEL failing. A
+ * successful cancel sends DLM_NORMAL here. If the
+ * lock grant happened before the cancel arrived, you get
+ * DLM_CANCELGRANT.
+ *
+ * There's no need for the double-ast. If we see DLM_CANCELGRANT,
+ * we just ignore it. We expect the lock_ast() to handle the
+ * granted lock.
+ */
+ if (status == DLM_CANCELGRANT)
+ return;
+
+ stack_glue_lproto->lp_unlock_ast(astarg, error);
+}
+
+static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
+ int mode,
+ union ocfs2_dlm_lksb *lksb,
+ u32 flags,
+ void *name,
+ unsigned int namelen,
+ void *astarg)
+{
+ enum dlm_status status;
+ int o2dlm_mode = mode_to_o2dlm(mode);
+ int o2dlm_flags = flags_to_o2dlm(flags);
+ int ret;
+
+ status = dlmlock(conn->cc_lockspace, o2dlm_mode, &lksb->lksb_o2dlm,
+ o2dlm_flags, name, namelen,
+ o2dlm_lock_ast_wrapper, astarg,
+ o2dlm_blocking_ast_wrapper);
+ ret = dlm_status_to_errno(status);
+ return ret;
+}
+
+static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn,
+ union ocfs2_dlm_lksb *lksb,
+ u32 flags,
+ void *astarg)
+{
+ enum dlm_status status;
+ int o2dlm_flags = flags_to_o2dlm(flags);
+ int ret;
+
+ status = dlmunlock(conn->cc_lockspace, &lksb->lksb_o2dlm,
+ o2dlm_flags, o2dlm_unlock_ast_wrapper, astarg);
+ ret = dlm_status_to_errno(status);
+ return ret;
+}
+
+static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
+{
+ return dlm_status_to_errno(lksb->lksb_o2dlm.status);
+}
+
+static void *o2cb_dlm_lvb(union ocfs2_dlm_lksb *lksb)
+{
+ return (void *)(lksb->lksb_o2dlm.lvb);
+}
+
+static void o2cb_dump_lksb(union ocfs2_dlm_lksb *lksb)
+{
+ dlm_print_one_lock(lksb->lksb_o2dlm.lockid);
+}
+
+/*
+ * Called from the dlm when it's about to evict a node. This is how the
+ * classic stack signals node death.
+ */
+static void o2dlm_eviction_cb(int node_num, void *data)
+{
+ struct ocfs2_cluster_connection *conn = data;
+
+ mlog(ML_NOTICE, "o2dlm has evicted node %d from group %.*s\n",
+ node_num, conn->cc_namelen, conn->cc_name);
+
+ conn->cc_recovery_handler(node_num, conn->cc_recovery_data);
+}
+
+static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn)
+{
+ int rc = 0;
+ u32 dlm_key;
+ struct dlm_ctxt *dlm;
+ struct o2dlm_private *priv;
+ struct dlm_protocol_version dlm_version;
+
+ BUG_ON(conn == NULL);
+
+ /* for now we only have one cluster/node, make sure we see it
+ * in the heartbeat universe */
+ if (!o2hb_check_local_node_heartbeating()) {
+ rc = -EINVAL;
+ goto out;
+ }
+
+ priv = kzalloc(sizeof(struct o2dlm_private), GFP_KERNEL);
+ if (!priv) {
+ rc = -ENOMEM;
+ goto out_free;
+ }
+
+ /* This just fills the structure in. It is safe to pass conn. */
+ dlm_setup_eviction_cb(&priv->op_eviction_cb, o2dlm_eviction_cb,
+ conn);
+
+ conn->cc_private = priv;
+
+ /* used by the dlm code to make message headers unique, each
+ * node in this domain must agree on this. */
+ dlm_key = crc32_le(0, conn->cc_name, conn->cc_namelen);
+ dlm_version.pv_major = conn->cc_version.pv_major;
+ dlm_version.pv_minor = conn->cc_version.pv_minor;
+
+ dlm = dlm_register_domain(conn->cc_name, dlm_key, &dlm_version);
+ if (IS_ERR(dlm)) {
+ rc = PTR_ERR(dlm);
+ mlog_errno(rc);
+ goto out_free;
+ }
+
+ conn->cc_version.pv_major = dlm_version.pv_major;
+ conn->cc_version.pv_minor = dlm_version.pv_minor;
+ conn->cc_lockspace = dlm;
+
+ dlm_register_eviction_cb(dlm, &priv->op_eviction_cb);
+
+out_free:
+ if (rc && conn->cc_private)
+ kfree(conn->cc_private);
+
+out:
+ return rc;
+}
+
+static int o2cb_cluster_disconnect(struct ocfs2_cluster_connection *conn)
+{
+ struct dlm_ctxt *dlm = conn->cc_lockspace;
+ struct o2dlm_private *priv = conn->cc_private;
+
+ dlm_unregister_eviction_cb(&priv->op_eviction_cb);
+ conn->cc_private = NULL;
+ kfree(priv);
+
+ dlm_unregister_domain(dlm);
+ conn->cc_lockspace = NULL;
+
+ return 0;
+}
+
+static void o2hb_stop(const char *group)
+{
+ int ret;
+ char *argv[5], *envp[3];
+
+ argv[0] = (char *)o2nm_get_hb_ctl_path();
+ argv[1] = "-K";
+ argv[2] = "-u";
+ argv[3] = (char *)group;
+ argv[4] = NULL;
+
+ mlog(0, "Run: %s %s %s %s\n", argv[0], argv[1], argv[2], argv[3]);
+
+ /* minimal command environment taken from cpu_run_sbin_hotplug */
+ envp[0] = "HOME=/";
+ envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
+ envp[2] = NULL;
+
+ ret = call_usermodehelper(argv[0], argv, envp, UMH_WAIT_PROC);
+ if (ret < 0)
+ mlog_errno(ret);
+}
+
+/*
+ * Hangup is a hack for tools compatibility. Older ocfs2-tools software
+ * expects the filesystem to call "ocfs2_hb_ctl" during unmount. This
+ * happens regardless of whether the DLM got started, so we can't do it
+ * in ocfs2_cluster_disconnect(). We bring the o2hb_stop() function into
+ * the glue and provide a "hangup" API for super.c to call.
+ *
+ * Other stacks will eventually provide a NULL ->hangup() pointer.
+ */
+static void o2cb_cluster_hangup(const char *group, int grouplen)
+{
+ o2hb_stop(group);
+}
+
+static int o2cb_cluster_this_node(unsigned int *node)
+{
+ int node_num;
+
+ node_num = o2nm_this_node();
+ if (node_num == O2NM_INVALID_NODE_NUM)
+ return -ENOENT;
+
+ if (node_num >= O2NM_MAX_NODES)
+ return -EOVERFLOW;
+
+ *node = node_num;
+ return 0;
+}
+
+struct ocfs2_stack_operations o2cb_stack_ops = {
+ .connect = o2cb_cluster_connect,
+ .disconnect = o2cb_cluster_disconnect,
+ .hangup = o2cb_cluster_hangup,
+ .this_node = o2cb_cluster_this_node,
+ .dlm_lock = o2cb_dlm_lock,
+ .dlm_unlock = o2cb_dlm_unlock,
+ .lock_status = o2cb_dlm_lock_status,
+ .lock_lvb = o2cb_dlm_lvb,
+ .dump_lksb = o2cb_dump_lksb,
+};
+
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index e35dde6..e197367 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -19,204 +19,17 @@
*/

#include <linux/slab.h>
-#include <linux/crc32.h>
#include <linux/kmod.h>

/* Needed for AOP_TRUNCATED_PAGE in mlog_errno() */
#include <linux/fs.h>

#include "cluster/masklog.h"
-#include "cluster/nodemanager.h"
-#include "cluster/heartbeat.h"

#include "stackglue.h"

-static struct ocfs2_locking_protocol *lproto;
-
-struct o2dlm_private {
- struct dlm_eviction_cb op_eviction_cb;
-};
-
-/* These should be identical */
-#if (DLM_LOCK_IV != LKM_IVMODE)
-# error Lock modes do not match
-#endif
-#if (DLM_LOCK_NL != LKM_NLMODE)
-# error Lock modes do not match
-#endif
-#if (DLM_LOCK_CR != LKM_CRMODE)
-# error Lock modes do not match
-#endif
-#if (DLM_LOCK_CW != LKM_CWMODE)
-# error Lock modes do not match
-#endif
-#if (DLM_LOCK_PR != LKM_PRMODE)
-# error Lock modes do not match
-#endif
-#if (DLM_LOCK_PW != LKM_PWMODE)
-# error Lock modes do not match
-#endif
-#if (DLM_LOCK_EX != LKM_EXMODE)
-# error Lock modes do not match
-#endif
-static inline int mode_to_o2dlm(int mode)
-{
- BUG_ON(mode > LKM_MAXMODE);
-
- return mode;
-}
-
-#define map_flag(_generic, _o2dlm) \
- if (flags & (_generic)) { \
- flags &= ~(_generic); \
- o2dlm_flags |= (_o2dlm); \
- }
-static int flags_to_o2dlm(u32 flags)
-{
- int o2dlm_flags = 0;
-
- map_flag(DLM_LKF_NOQUEUE, LKM_NOQUEUE);
- map_flag(DLM_LKF_CANCEL, LKM_CANCEL);
- map_flag(DLM_LKF_CONVERT, LKM_CONVERT);
- map_flag(DLM_LKF_VALBLK, LKM_VALBLK);
- map_flag(DLM_LKF_IVVALBLK, LKM_INVVALBLK);
- map_flag(DLM_LKF_ORPHAN, LKM_ORPHAN);
- map_flag(DLM_LKF_FORCEUNLOCK, LKM_FORCE);
- map_flag(DLM_LKF_TIMEOUT, LKM_TIMEOUT);
- map_flag(DLM_LKF_LOCAL, LKM_LOCAL);
-
- /* map_flag() should have cleared every flag passed in */
- BUG_ON(flags != 0);
-
- return o2dlm_flags;
-}
-#undef map_flag
-
-/*
- * Map an o2dlm status to standard errno values.
- *
- * o2dlm only uses a handful of these, and returns even fewer to the
- * caller. Still, we try to assign sane values to each error.
- *
- * The following value pairs have special meanings to dlmglue, thus
- * the right hand side needs to stay unique - never duplicate the
- * mapping elsewhere in the table!
- *
- * DLM_NORMAL: 0
- * DLM_NOTQUEUED: -EAGAIN
- * DLM_CANCELGRANT: -EBUSY
- * DLM_CANCEL: -DLM_ECANCEL
- */
-/* Keep in sync with dlmapi.h */
-static int status_map[] = {
- [DLM_NORMAL] = 0, /* Success */
- [DLM_GRANTED] = -EINVAL,
- [DLM_DENIED] = -EACCES,
- [DLM_DENIED_NOLOCKS] = -EACCES,
- [DLM_WORKING] = -EACCES,
- [DLM_BLOCKED] = -EINVAL,
- [DLM_BLOCKED_ORPHAN] = -EINVAL,
- [DLM_DENIED_GRACE_PERIOD] = -EACCES,
- [DLM_SYSERR] = -ENOMEM, /* It is what it is */
- [DLM_NOSUPPORT] = -EPROTO,
- [DLM_CANCELGRANT] = -EBUSY, /* Cancel after grant */
- [DLM_IVLOCKID] = -EINVAL,
- [DLM_SYNC] = -EINVAL,
- [DLM_BADTYPE] = -EINVAL,
- [DLM_BADRESOURCE] = -EINVAL,
- [DLM_MAXHANDLES] = -ENOMEM,
- [DLM_NOCLINFO] = -EINVAL,
- [DLM_NOLOCKMGR] = -EINVAL,
- [DLM_NOPURGED] = -EINVAL,
- [DLM_BADARGS] = -EINVAL,
- [DLM_VOID] = -EINVAL,
- [DLM_NOTQUEUED] = -EAGAIN, /* Trylock failed */
- [DLM_IVBUFLEN] = -EINVAL,
- [DLM_CVTUNGRANT] = -EPERM,
- [DLM_BADPARAM] = -EINVAL,
- [DLM_VALNOTVALID] = -EINVAL,
- [DLM_REJECTED] = -EPERM,
- [DLM_ABORT] = -EINVAL,
- [DLM_CANCEL] = -DLM_ECANCEL, /* Successful cancel */
- [DLM_IVRESHANDLE] = -EINVAL,
- [DLM_DEADLOCK] = -EDEADLK,
- [DLM_DENIED_NOASTS] = -EINVAL,
- [DLM_FORWARD] = -EINVAL,
- [DLM_TIMEOUT] = -ETIMEDOUT,
- [DLM_IVGROUPID] = -EINVAL,
- [DLM_VERS_CONFLICT] = -EOPNOTSUPP,
- [DLM_BAD_DEVICE_PATH] = -ENOENT,
- [DLM_NO_DEVICE_PERMISSION] = -EPERM,
- [DLM_NO_CONTROL_DEVICE] = -ENOENT,
- [DLM_RECOVERING] = -ENOTCONN,
- [DLM_MIGRATING] = -ERESTART,
- [DLM_MAXSTATS] = -EINVAL,
-};
-
-static int dlm_status_to_errno(enum dlm_status status)
-{
- BUG_ON(status > (sizeof(status_map) / sizeof(status_map[0])));
+struct ocfs2_locking_protocol *stack_glue_lproto;

- return status_map[status];
-}
-
-static void o2dlm_lock_ast_wrapper(void *astarg)
-{
- BUG_ON(lproto == NULL);
-
- lproto->lp_lock_ast(astarg);
-}
-
-static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
-{
- BUG_ON(lproto == NULL);
-
- lproto->lp_blocking_ast(astarg, level);
-}
-
-static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
-{
- int error = dlm_status_to_errno(status);
-
- BUG_ON(lproto == NULL);
-
- /*
- * In o2dlm, you can get both the lock_ast() for the lock being
- * granted and the unlock_ast() for the CANCEL failing. A
- * successful cancel sends DLM_NORMAL here. If the
- * lock grant happened before the cancel arrived, you get
- * DLM_CANCELGRANT.
- *
- * There's no need for the double-ast. If we see DLM_CANCELGRANT,
- * we just ignore it. We expect the lock_ast() to handle the
- * granted lock.
- */
- if (status == DLM_CANCELGRANT)
- return;
-
- lproto->lp_unlock_ast(astarg, error);
-}
-
-static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
- int mode,
- union ocfs2_dlm_lksb *lksb,
- u32 flags,
- void *name,
- unsigned int namelen,
- void *astarg)
-{
- enum dlm_status status;
- int o2dlm_mode = mode_to_o2dlm(mode);
- int o2dlm_flags = flags_to_o2dlm(flags);
- int ret;
-
- status = dlmlock(conn->cc_lockspace, o2dlm_mode, &lksb->lksb_o2dlm,
- o2dlm_flags, name, namelen,
- o2dlm_lock_ast_wrapper, astarg,
- o2dlm_blocking_ast_wrapper);
- ret = dlm_status_to_errno(status);
- return ret;
-}

int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
@@ -226,25 +39,10 @@ int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
unsigned int namelen,
void *astarg)
{
- BUG_ON(lproto == NULL);
-
- return o2cb_dlm_lock(conn, mode, lksb, flags,
- name, namelen, astarg);
-}
-
-static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn,
- union ocfs2_dlm_lksb *lksb,
- u32 flags,
- void *astarg)
-{
- enum dlm_status status;
- int o2dlm_flags = flags_to_o2dlm(flags);
- int ret;
+ BUG_ON(stack_glue_lproto == NULL);

- status = dlmunlock(conn->cc_lockspace, &lksb->lksb_o2dlm,
- o2dlm_flags, o2dlm_unlock_ast_wrapper, astarg);
- ret = dlm_status_to_errno(status);
- return ret;
+ return o2cb_stack_ops.dlm_lock(conn, mode, lksb, flags,
+ name, namelen, astarg);
}

int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
@@ -252,19 +50,14 @@ int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
u32 flags,
void *astarg)
{
- BUG_ON(lproto == NULL);
+ BUG_ON(stack_glue_lproto == NULL);

- return o2cb_dlm_unlock(conn, lksb, flags, astarg);
-}
-
-static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
-{
- return dlm_status_to_errno(lksb->lksb_o2dlm.status);
+ return o2cb_stack_ops.dlm_unlock(conn, lksb, flags, astarg);
}

int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
{
- return o2cb_dlm_lock_status(lksb);
+ return o2cb_stack_ops.lock_status(lksb);
}

/*
@@ -272,94 +65,14 @@ int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
* don't cast at the glue level. The real answer is that the header
* ordering is nigh impossible.
*/
-static void *o2cb_dlm_lvb(union ocfs2_dlm_lksb *lksb)
-{
- return (void *)(lksb->lksb_o2dlm.lvb);
-}
-
void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb)
{
- return o2cb_dlm_lvb(lksb);
-}
-
-static void o2cb_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
-{
- dlm_print_one_lock(lksb->lksb_o2dlm.lockid);
+ return o2cb_stack_ops.lock_lvb(lksb);
}

void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
{
- o2cb_dlm_dump_lksb(lksb);
-}
-
-/*
- * Called from the dlm when it's about to evict a node. This is how the
- * classic stack signals node death.
- */
-static void o2dlm_eviction_cb(int node_num, void *data)
-{
- struct ocfs2_cluster_connection *conn = data;
-
- mlog(ML_NOTICE, "o2dlm has evicted node %d from group %.*s\n",
- node_num, conn->cc_namelen, conn->cc_name);
-
- conn->cc_recovery_handler(node_num, conn->cc_recovery_data);
-}
-
-static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn)
-{
- int rc = 0;
- u32 dlm_key;
- struct dlm_ctxt *dlm;
- struct o2dlm_private *priv;
- struct dlm_protocol_version dlm_version;
-
- BUG_ON(conn == NULL);
-
- /* for now we only have one cluster/node, make sure we see it
- * in the heartbeat universe */
- if (!o2hb_check_local_node_heartbeating()) {
- rc = -EINVAL;
- goto out;
- }
-
- priv = kzalloc(sizeof(struct o2dlm_private), GFP_KERNEL);
- if (!priv) {
- rc = -ENOMEM;
- goto out_free;
- }
-
- /* This just fills the structure in. It is safe to pass conn. */
- dlm_setup_eviction_cb(&priv->op_eviction_cb, o2dlm_eviction_cb,
- conn);
-
- conn->cc_private = priv;
-
- /* used by the dlm code to make message headers unique, each
- * node in this domain must agree on this. */
- dlm_key = crc32_le(0, conn->cc_name, conn->cc_namelen);
- dlm_version.pv_major = conn->cc_version.pv_major;
- dlm_version.pv_minor = conn->cc_version.pv_minor;
-
- dlm = dlm_register_domain(conn->cc_name, dlm_key, &dlm_version);
- if (IS_ERR(dlm)) {
- rc = PTR_ERR(dlm);
- mlog_errno(rc);
- goto out_free;
- }
-
- conn->cc_version.pv_major = dlm_version.pv_major;
- conn->cc_version.pv_minor = dlm_version.pv_minor;
- conn->cc_lockspace = dlm;
-
- dlm_register_eviction_cb(dlm, &priv->op_eviction_cb);
-
-out_free:
- if (rc && conn->cc_private)
- kfree(conn->cc_private);
-
-out:
- return rc;
+ o2cb_stack_ops.dump_lksb(lksb);
}

int ocfs2_cluster_connect(const char *group,
@@ -394,9 +107,9 @@ int ocfs2_cluster_connect(const char *group,
new_conn->cc_recovery_data = recovery_data;

/* Start the new connection at our maximum compatibility level */
- new_conn->cc_version = lproto->lp_max_version;
+ new_conn->cc_version = stack_glue_lproto->lp_max_version;

- rc = o2cb_cluster_connect(new_conn);
+ rc = o2cb_stack_ops.connect(new_conn);
if (rc) {
mlog_errno(rc);
goto out_free;
@@ -412,29 +125,13 @@ out:
return rc;
}

-
-static int o2cb_cluster_disconnect(struct ocfs2_cluster_connection *conn)
-{
- struct dlm_ctxt *dlm = conn->cc_lockspace;
- struct o2dlm_private *priv = conn->cc_private;
-
- dlm_unregister_eviction_cb(&priv->op_eviction_cb);
- conn->cc_private = NULL;
- kfree(priv);
-
- dlm_unregister_domain(dlm);
- conn->cc_lockspace = NULL;
-
- return 0;
-}
-
int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn)
{
int ret;

BUG_ON(conn == NULL);

- ret = o2cb_cluster_disconnect(conn);
+ ret = o2cb_stack_ops.disconnect(conn);

/* XXX Should we free it anyway? */
if (!ret)
@@ -443,75 +140,23 @@ int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn)
return ret;
}

-static void o2hb_stop(const char *group)
-{
- int ret;
- char *argv[5], *envp[3];
-
- argv[0] = (char *)o2nm_get_hb_ctl_path();
- argv[1] = "-K";
- argv[2] = "-u";
- argv[3] = (char *)group;
- argv[4] = NULL;
-
- mlog(0, "Run: %s %s %s %s\n", argv[0], argv[1], argv[2], argv[3]);
-
- /* minimal command environment taken from cpu_run_sbin_hotplug */
- envp[0] = "HOME=/";
- envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
- envp[2] = NULL;
-
- ret = call_usermodehelper(argv[0], argv, envp, UMH_WAIT_PROC);
- if (ret < 0)
- mlog_errno(ret);
-}
-
-/*
- * Hangup is a hack for tools compatibility. Older ocfs2-tools software
- * expects the filesystem to call "ocfs2_hb_ctl" during unmount. This
- * happens regardless of whether the DLM got started, so we can't do it
- * in ocfs2_cluster_disconnect(). We bring the o2hb_stop() function into
- * the glue and provide a "hangup" API for super.c to call.
- *
- * Other stacks will eventually provide a NULL ->hangup() pointer.
- */
-static void o2cb_cluster_hangup(const char *group, int grouplen)
-{
- o2hb_stop(group);
-}
-
void ocfs2_cluster_hangup(const char *group, int grouplen)
{
BUG_ON(group == NULL);
BUG_ON(group[grouplen] != '\0');

- o2cb_cluster_hangup(group, grouplen);
-}
-
-static int o2cb_cluster_this_node(unsigned int *node)
-{
- int node_num;
-
- node_num = o2nm_this_node();
- if (node_num == O2NM_INVALID_NODE_NUM)
- return -ENOENT;
-
- if (node_num >= O2NM_MAX_NODES)
- return -EOVERFLOW;
-
- *node = node_num;
- return 0;
+ o2cb_stack_ops.hangup(group, grouplen);
}

int ocfs2_cluster_this_node(unsigned int *node)
{
- return o2cb_cluster_this_node(node);
+ return o2cb_stack_ops.this_node(node);
}

void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto)
{
BUG_ON(proto != NULL);

- lproto = proto;
+ stack_glue_lproto = proto;
}

diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index decb147..0836322 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -25,6 +25,8 @@
#include <linux/list.h>
#include <linux/dlmconstants.h>

+#include "dlm/dlmapi.h"
+
/*
* dlmconstants.h does not have a LOCAL flag. We hope to remove it
* some day, but right now we need it. Let's fake it. This value is larger
@@ -39,13 +41,18 @@
#define GROUP_NAME_MAX 64


-#include "dlm/dlmapi.h"
-
+/*
+ * ocfs2_protocol_version changes when ocfs2 does something different in
+ * its inter-node behavior. See dlmglue.c for more information.
+ */
struct ocfs2_protocol_version {
u8 pv_major;
u8 pv_minor;
};

+/*
+ * The ocfs2_locking_protocol defines the handlers called on ocfs2's behalf.
+ */
struct ocfs2_locking_protocol {
struct ocfs2_protocol_version lp_max_version;
void (*lp_lock_ast)(void *astarg);
@@ -53,10 +60,20 @@ struct ocfs2_locking_protocol {
void (*lp_unlock_ast)(void *astarg, int error);
};

+/*
+ * A union of all lock status structures. We define it here so that the
+ * size of the union is known. Lock status structures are embedded in
+ * ocfs2 inodes.
+ */
union ocfs2_dlm_lksb {
struct dlm_lockstatus lksb_o2dlm;
};

+/*
+ * A cluster connection. Mostly opaque to ocfs2, the connection holds
+ * state for the underlying stack. ocfs2 does use cc_version to determine
+ * locking compatibility.
+ */
struct ocfs2_cluster_connection {
char cc_name[GROUP_NAME_MAX];
int cc_namelen;
@@ -67,6 +84,106 @@ struct ocfs2_cluster_connection {
void *cc_private;
};

+/*
+ * Each cluster stack implements the stack operations structure. Not used
+ * in the ocfs2 code, the stackglue code translates generic cluster calls
+ * into stack operations.
+ */
+struct ocfs2_stack_operations {
+ /*
+ * The fs code calls ocfs2_cluster_connect() to attach a new
+ * filesystem to the cluster stack. The ->connect() op is passed
+ * an ocfs2_cluster_connection with the name and recovery field
+ * filled in.
+ *
+ * The stack must set up any notification mechanisms and create
+ * the filesystem lockspace in the DLM. The lockspace should be
+ * stored on cc_lockspace. Any other information can be stored on
+ * cc_private.
+ *
+ * ->connect() must not return until it is guaranteed that
+ *
+ * - Node down notifications for the filesystem will be recieved
+ * and passed to conn->cc_recovery_handler().
+ * - Locking requests for the filesystem will be processed.
+ */
+ int (*connect)(struct ocfs2_cluster_connection *conn);
+
+ /*
+ * The fs code calls ocfs2_cluster_disconnect() when a filesystem
+ * no longer needs cluster services. All DLM locks have been
+ * dropped, and recovery notification is being ignored by the
+ * fs code. The stack must disengage from the DLM and discontinue
+ * recovery notification.
+ *
+ * Once ->disconnect() has returned, the connection structure will
+ * be freed. Thus, a stack must not return from ->disconnect()
+ * until it will no longer reference the conn pointer.
+ */
+ int (*disconnect)(struct ocfs2_cluster_connection *conn);
+
+ /*
+ * ocfs2_cluster_hangup() exists for compatibility with older
+ * ocfs2 tools. Only the classic stack really needs it. As such
+ * ->hangup() is not required of all stacks. See the comment by
+ * ocfs2_cluster_hangup() for more details.
+ */
+ void (*hangup)(const char *group, int grouplen);
+
+ /*
+ * ->this_node() returns the cluster's unique identifier for the
+ * local node.
+ */
+ int (*this_node)(unsigned int *node);
+
+ /*
+ * Call the underlying dlm lock function. The ->dlm_lock()
+ * callback should convert the flags and mode as appropriate.
+ *
+ * ast and bast functions are not part of the call because the
+ * stack will likely want to wrap ast and bast calls before passing
+ * them to stack->sp_proto.
+ */
+ int (*dlm_lock)(struct ocfs2_cluster_connection *conn,
+ int mode,
+ union ocfs2_dlm_lksb *lksb,
+ u32 flags,
+ void *name,
+ unsigned int namelen,
+ void *astarg);
+
+ /*
+ * Call the underlying dlm unlock function. The ->dlm_unlock()
+ * function should convert the flags as appropriate.
+ *
+ * The unlock ast is not passed, as the stack will want to wrap
+ * it before calling stack->sp_proto->lp_unlock_ast().
+ */
+ int (*dlm_unlock)(struct ocfs2_cluster_connection *conn,
+ union ocfs2_dlm_lksb *lksb,
+ u32 flags,
+ void *astarg);
+
+ /*
+ * Return the status of the current lock status block. The fs
+ * code should never dereference the union. The ->lock_status()
+ * callback pulls out the stack-specific lksb, converts the status
+ * to a proper errno, and returns it.
+ */
+ int (*lock_status)(union ocfs2_dlm_lksb *lksb);
+
+ /*
+ * Pull the lvb pointer off of the stack-specific lksb.
+ */
+ void *(*lock_lvb)(union ocfs2_dlm_lksb *lksb);
+
+ /*
+ * This is an optoinal debugging hook. If provided, the
+ * stack can dump debugging information about this lock.
+ */
+ void (*dump_lksb)(union ocfs2_dlm_lksb *lksb);
+};
+
int ocfs2_cluster_connect(const char *group,
int grouplen,
void (*recovery_handler)(int node_num,
@@ -95,4 +212,6 @@ void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb);

void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto);

+extern struct ocfs2_locking_protocol *stack_glue_lproto;
+extern struct ocfs2_stack_operations o2cb_stack_ops;
#endif /* STACKGLUE_H */
--
1.5.3.8

2008-03-06 00:38:16

by Joel Becker

[permalink] [raw]
Subject: [PATCH 12/18] ocfs2: Clean up stackglue initialization

The stackglue initialization changes to be properly named. It can be
cleanly called when stackglue becomes a module.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/dlmglue.c | 9 ++-------
fs/ocfs2/dlmglue.h | 5 ++---
fs/ocfs2/stackglue.c | 8 ++------
fs/ocfs2/stackglue.h | 3 +--
fs/ocfs2/super.c | 6 ++----
5 files changed, 9 insertions(+), 22 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 58a34df..435d7e0 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -3366,16 +3366,11 @@ static struct ocfs2_locking_protocol lproto = {
.lp_unlock_ast = ocfs2_unlock_ast,
};

-/* This interface isn't the final one, hence the less-than-perfect names */
-void dlmglue_init_stack(void)
+void ocfs2_set_locking_protocol(void)
{
- o2cb_get_stack(&lproto);
+ ocfs2_stack_glue_set_locking_protocol(&lproto);
}

-void dlmglue_exit_stack(void)
-{
- o2cb_put_stack();
-}

static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h
index 2d0a8a0..34b7598 100644
--- a/fs/ocfs2/dlmglue.h
+++ b/fs/ocfs2/dlmglue.h
@@ -114,7 +114,6 @@ void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb);
struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void);
void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug);

-void dlmglue_init_stack(void);
-void dlmglue_exit_stack(void);
-
+/* To set the locking protocol on module initialization */
+void ocfs2_set_locking_protocol(void);
#endif /* DLMGLUE_H */
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index bd80541..51c2546 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -429,14 +429,10 @@ int ocfs2_cluster_this_node(unsigned int *node)
return 0;
}

-void o2cb_get_stack(struct ocfs2_locking_protocol *proto)
+void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto)
{
- BUG_ON(proto == NULL);
+ BUG_ON(proto != NULL);

lproto = proto;
}

-void o2cb_put_stack(void)
-{
- lproto = NULL;
-}
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index 01e3c9b..decb147 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -93,7 +93,6 @@ int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb);
void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb);
void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb);

-void o2cb_get_stack(struct ocfs2_locking_protocol *proto);
-void o2cb_put_stack(void);
+void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto);

#endif /* STACKGLUE_H */
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index fa9c46e..b4a02a0 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -922,8 +922,6 @@ static int __init ocfs2_init(void)

ocfs2_print_version();

- dlmglue_init_stack();
-
status = init_ocfs2_uptodate_cache();
if (status < 0) {
mlog_errno(status);
@@ -948,6 +946,8 @@ static int __init ocfs2_init(void)
mlog(ML_ERROR, "Unable to create ocfs2 debugfs root.\n");
}

+ ocfs2_set_locking_protocol();
+
leave:
if (status < 0) {
ocfs2_free_mem_caches();
@@ -979,8 +979,6 @@ static void __exit ocfs2_exit(void)

exit_ocfs2_uptodate_cache();

- dlmglue_exit_stack();
-
mlog_exit_void();
}

--
1.5.3.8

2008-03-06 00:38:49

by Joel Becker

[permalink] [raw]
Subject: [PATCH 03/18] ocfs2: Use -errno instead of dlm_status for ocfs2_dlm_lock/unlock() API.

Change the ocfs2_dlm_lock/unlock() functions to return -errno values.
This is the first step towards elminiating dlm_status in
fs/ocfs2/dlmglue.c. The change also passes -errno values to
->unlock_ast().

[ Fix a return code in dlmglue.c and change the error translation table into
an array of ints. --Mark ]

Signed-off-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/dlmglue.c | 116 ++++++++++++++++++-----------------------
fs/ocfs2/stackglue.c | 142 +++++++++++++++++++++++++++++++++++++++++++++++---
fs/ocfs2/stackglue.h | 6 +-
3 files changed, 188 insertions(+), 76 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 5806d53..12a5213 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -329,10 +329,9 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres);
static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
int convert);
-#define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \
- mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \
- "resource %s: %s\n", dlm_errname(_stat), _func, \
- _lockres->l_name, dlm_errmsg(_stat)); \
+#define ocfs2_log_dlm_error(_func, _err, _lockres) do { \
+ mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \
+ _err, _func, _lockres->l_name); \
} while (0)
static int ocfs2_downconvert_thread(void *arg);
static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
@@ -867,7 +866,6 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
u32 dlm_flags)
{
int ret = 0;
- enum dlm_status status = DLM_NORMAL;
unsigned long flags;

mlog_entry_void();
@@ -887,21 +885,19 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
spin_unlock_irqrestore(&lockres->l_lock, flags);

- status = ocfs2_dlm_lock(osb->dlm,
- level,
- &lockres->l_lksb,
- dlm_flags,
- lockres->l_name,
- OCFS2_LOCK_ID_MAX_LEN - 1,
- lockres);
- if (status != DLM_NORMAL) {
- ocfs2_log_dlm_error("ocfs2_dlm_lock", status, lockres);
- ret = -EINVAL;
+ ret = ocfs2_dlm_lock(osb->dlm,
+ level,
+ &lockres->l_lksb,
+ dlm_flags,
+ lockres->l_name,
+ OCFS2_LOCK_ID_MAX_LEN - 1,
+ lockres);
+ if (ret) {
+ ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
ocfs2_recover_from_dlm_error(lockres, 1);
}

- mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n",
- lockres->l_name);
+ mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);

bail:
mlog_exit(ret);
@@ -1018,7 +1014,6 @@ static int ocfs2_cluster_lock(struct ocfs2_super *osb,
int arg_flags)
{
struct ocfs2_mask_waiter mw;
- enum dlm_status status;
int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
unsigned long flags;
@@ -1089,21 +1084,18 @@ again:
lockres->l_name, lockres->l_level, level);

/* call dlm_lock to upgrade lock now */
- status = ocfs2_dlm_lock(osb->dlm,
- level,
- &lockres->l_lksb,
- lkm_flags,
- lockres->l_name,
- OCFS2_LOCK_ID_MAX_LEN - 1,
- lockres);
- if (status != DLM_NORMAL) {
- if ((lkm_flags & DLM_LKF_NOQUEUE) &&
- (status == DLM_NOTQUEUED))
- ret = -EAGAIN;
- else {
+ ret = ocfs2_dlm_lock(osb->dlm,
+ level,
+ &lockres->l_lksb,
+ lkm_flags,
+ lockres->l_name,
+ OCFS2_LOCK_ID_MAX_LEN - 1,
+ lockres);
+ if (ret) {
+ if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
+ (ret != -EAGAIN)) {
ocfs2_log_dlm_error("ocfs2_dlm_lock",
- status, lockres);
- ret = -EINVAL;
+ ret, lockres);
}
ocfs2_recover_from_dlm_error(lockres, 1);
goto out;
@@ -1502,10 +1494,8 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
ret = ocfs2_dlm_lock(osb->dlm, level, &lockres->l_lksb, lkm_flags,
lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
- if (ret != DLM_NORMAL) {
- if (trylock && ret == DLM_NOTQUEUED)
- ret = -EAGAIN;
- else {
+ if (ret) {
+ if (!trylock || (ret != -EAGAIN)) {
ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
ret = -EINVAL;
}
@@ -2573,7 +2563,7 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
mlog_exit_void();
}

-static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
+static void ocfs2_unlock_ast(void *opaque, int error)
{
struct ocfs2_lock_res *lockres = opaque;
unsigned long flags;
@@ -2589,7 +2579,7 @@ static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
* state. The wake_up call done at the bottom is redundant
* (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
* hurt anything anyway */
- if (status == DLM_CANCELGRANT &&
+ if (error == -DLM_ECANCEL &&
lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
mlog(0, "Got cancelgrant for %s\n", lockres->l_name);

@@ -2599,9 +2589,10 @@ static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
goto complete_unlock;
}

- if (status != DLM_NORMAL) {
- mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
- "unlock_action %d\n", status, lockres->l_name,
+ /* DLM_EUNLOCK is the success code for unlock */
+ if (error != -DLM_EUNLOCK) {
+ mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
+ "unlock_action %d\n", error, lockres->l_name,
lockres->l_unlock_action);
spin_unlock_irqrestore(&lockres->l_lock, flags);
return;
@@ -2632,7 +2623,7 @@ complete_unlock:
static int ocfs2_drop_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
{
- enum dlm_status status;
+ int ret;
unsigned long flags;
u32 lkm_flags = 0;

@@ -2696,10 +2687,10 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,

mlog(0, "lock %s\n", lockres->l_name);

- status = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb, lkm_flags,
- lockres);
- if (status != DLM_NORMAL) {
- ocfs2_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
+ ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb, lkm_flags,
+ lockres);
+ if (ret) {
+ ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
dlm_print_one_lock(lockres->l_lksb.lockid);
BUG();
@@ -2823,23 +2814,21 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
{
int ret;
u32 dlm_flags = DLM_LKF_CONVERT;
- enum dlm_status status;

mlog_entry_void();

if (lvb)
dlm_flags |= DLM_LKF_VALBLK;

- status = ocfs2_dlm_lock(osb->dlm,
- new_level,
- &lockres->l_lksb,
- dlm_flags,
- lockres->l_name,
- OCFS2_LOCK_ID_MAX_LEN - 1,
- lockres);
- if (status != DLM_NORMAL) {
- ocfs2_log_dlm_error("ocfs2_dlm_lock", status, lockres);
- ret = -EINVAL;
+ ret = ocfs2_dlm_lock(osb->dlm,
+ new_level,
+ &lockres->l_lksb,
+ dlm_flags,
+ lockres->l_name,
+ OCFS2_LOCK_ID_MAX_LEN - 1,
+ lockres);
+ if (ret) {
+ ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
ocfs2_recover_from_dlm_error(lockres, 1);
goto bail;
}
@@ -2886,19 +2875,14 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
{
int ret;
- enum dlm_status status;

mlog_entry_void();
mlog(0, "lock %s\n", lockres->l_name);

- ret = 0;
- status = ocfs2_dlm_unlock(osb->dlm,
- &lockres->l_lksb,
- DLM_LKF_CANCEL,
- lockres);
- if (status != DLM_NORMAL) {
- ocfs2_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
- ret = -EINVAL;
+ ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb,
+ DLM_LKF_CANCEL, lockres);
+ if (ret) {
+ ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
ocfs2_recover_from_dlm_error(lockres, 0);
}

diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 9953804..0aec2fc 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -18,6 +18,7 @@
* General Public License for more details.
*/

+#include "cluster/masklog.h"
#include "stackglue.h"

static struct ocfs2_locking_protocol *lproto;
@@ -77,7 +78,126 @@ static int flags_to_o2dlm(u32 flags)
}
#undef map_flag

-enum dlm_status ocfs2_dlm_lock(struct dlm_ctxt *dlm,
+/*
+ * Map an o2dlm status to standard errno values.
+ *
+ * o2dlm only uses a handful of these, and returns even fewer to the
+ * caller. Still, we try to assign sane values to each error.
+ *
+ * The following value pairs have special meanings to dlmglue, thus
+ * the right hand side needs to stay unique - never duplicate the
+ * mapping elsewhere in the table!
+ *
+ * DLM_NORMAL: 0
+ * DLM_NOTQUEUED: -EAGAIN
+ * DLM_CANCELGRANT: -DLM_ECANCEL
+ * DLM_CANCEL: -DLM_EUNLOCK
+ */
+/* Keep in sync with dlmapi.h */
+static int status_map[] = {
+ [DLM_NORMAL] = 0, /* Success */
+ [DLM_GRANTED] = -EINVAL,
+ [DLM_DENIED] = -EACCES,
+ [DLM_DENIED_NOLOCKS] = -EACCES,
+ [DLM_WORKING] = -EBUSY,
+ [DLM_BLOCKED] = -EINVAL,
+ [DLM_BLOCKED_ORPHAN] = -EINVAL,
+ [DLM_DENIED_GRACE_PERIOD] = -EACCES,
+ [DLM_SYSERR] = -ENOMEM, /* It is what it is */
+ [DLM_NOSUPPORT] = -EPROTO,
+ [DLM_CANCELGRANT] = -DLM_ECANCEL, /* Cancel after grant */
+ [DLM_IVLOCKID] = -EINVAL,
+ [DLM_SYNC] = -EINVAL,
+ [DLM_BADTYPE] = -EINVAL,
+ [DLM_BADRESOURCE] = -EINVAL,
+ [DLM_MAXHANDLES] = -ENOMEM,
+ [DLM_NOCLINFO] = -EINVAL,
+ [DLM_NOLOCKMGR] = -EINVAL,
+ [DLM_NOPURGED] = -EINVAL,
+ [DLM_BADARGS] = -EINVAL,
+ [DLM_VOID] = -EINVAL,
+ [DLM_NOTQUEUED] = -EAGAIN, /* Trylock failed */
+ [DLM_IVBUFLEN] = -EINVAL,
+ [DLM_CVTUNGRANT] = -EPERM,
+ [DLM_BADPARAM] = -EINVAL,
+ [DLM_VALNOTVALID] = -EINVAL,
+ [DLM_REJECTED] = -EPERM,
+ [DLM_ABORT] = -EINVAL,
+ [DLM_CANCEL] = -DLM_EUNLOCK, /* Successful cancel */
+ [DLM_IVRESHANDLE] = -EINVAL,
+ [DLM_DEADLOCK] = -EDEADLK,
+ [DLM_DENIED_NOASTS] = -EINVAL,
+ [DLM_FORWARD] = -EINVAL,
+ [DLM_TIMEOUT] = -ETIMEDOUT,
+ [DLM_IVGROUPID] = -EINVAL,
+ [DLM_VERS_CONFLICT] = -EOPNOTSUPP,
+ [DLM_BAD_DEVICE_PATH] = -ENOENT,
+ [DLM_NO_DEVICE_PERMISSION] = -EPERM,
+ [DLM_NO_CONTROL_DEVICE] = -ENOENT,
+ [DLM_RECOVERING] = -ENOTCONN,
+ [DLM_MIGRATING] = -ERESTART,
+ [DLM_MAXSTATS] = -EINVAL,
+};
+static int dlm_status_to_errno(enum dlm_status status)
+{
+ BUG_ON(status > (sizeof(status_map) / sizeof(status_map[0])));
+
+ return status_map[status];
+}
+
+static void o2dlm_lock_ast_wrapper(void *astarg)
+{
+ BUG_ON(lproto == NULL);
+
+ lproto->lp_lock_ast(astarg);
+}
+
+static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
+{
+ BUG_ON(lproto == NULL);
+
+ lproto->lp_blocking_ast(astarg, level);
+}
+
+static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
+{
+ int error;
+
+ BUG_ON(lproto == NULL);
+
+ /*
+ * XXX: CANCEL values are sketchy.
+ *
+ * Currently we have preserved the o2dlm paradigm. You can get
+ * unlock_ast() whether the cancel succeded or not.
+ *
+ * First, we're going to pass DLM_EUNLOCK just like fs/dlm does for
+ * successful unlocks. That is a clean behavior.
+ *
+ * In o2dlm, you can get both the lock_ast() for the lock being
+ * granted and the unlock_ast() for the CANCEL failing. A
+ * successful cancel sends DLM_NORMAL here. If the
+ * lock grant happened before the cancel arrived, you get
+ * DLM_CANCELGRANT. For now, we'll use DLM_ECANCEL to signify
+ * CANCELGRANT - the CANCEL was supposed to happen but didn't. We
+ * can then use DLM_EUNLOCK to signify a successful CANCEL -
+ * effectively, the CANCEL caused the lock to roll back.
+ *
+ * In the future, we will likely move the o2dlm to send only one
+ * ast - either unlock_ast() for a successful CANCEL or lock_ast()
+ * when the grant succeeds. At that point, we'll send DLM_ECANCEL
+ * for all cancel results (CANCELGRANT will no longer exist).
+ */
+ error = dlm_status_to_errno(status);
+
+ /* Successful unlock is DLM_EUNLOCK */
+ if (!error)
+ error = -DLM_EUNLOCK;
+
+ lproto->lp_unlock_ast(astarg, error);
+}
+
+int ocfs2_dlm_lock(struct dlm_ctxt *dlm,
int mode,
struct dlm_lockstatus *lksb,
u32 flags,
@@ -85,27 +205,35 @@ enum dlm_status ocfs2_dlm_lock(struct dlm_ctxt *dlm,
unsigned int namelen,
void *astarg)
{
+ enum dlm_status status;
int o2dlm_mode = mode_to_o2dlm(mode);
int o2dlm_flags = flags_to_o2dlm(flags);
+ int ret;

BUG_ON(lproto == NULL);

- return dlmlock(dlm, o2dlm_mode, lksb, o2dlm_flags, name, namelen,
- lproto->lp_lock_ast, astarg,
- lproto->lp_blocking_ast);
+ status = dlmlock(dlm, o2dlm_mode, lksb, o2dlm_flags, name, namelen,
+ o2dlm_lock_ast_wrapper, astarg,
+ o2dlm_blocking_ast_wrapper);
+ ret = dlm_status_to_errno(status);
+ return ret;
}

-enum dlm_status ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
+int ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
struct dlm_lockstatus *lksb,
u32 flags,
void *astarg)
{
+ enum dlm_status status;
int o2dlm_flags = flags_to_o2dlm(flags);
+ int ret;

BUG_ON(lproto == NULL);

- return dlmunlock(dlm, lksb, o2dlm_flags,
- lproto->lp_unlock_ast, astarg);
+ status = dlmunlock(dlm, lksb, o2dlm_flags,
+ o2dlm_unlock_ast_wrapper, astarg);
+ ret = dlm_status_to_errno(status);
+ return ret;
}


diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index 986d059..8ebcfba 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -37,17 +37,17 @@
struct ocfs2_locking_protocol {
void (*lp_lock_ast)(void *astarg);
void (*lp_blocking_ast)(void *astarg, int level);
- void (*lp_unlock_ast)(void *astarg, enum dlm_status status);
+ void (*lp_unlock_ast)(void *astarg, int error);
};

-enum dlm_status ocfs2_dlm_lock(struct dlm_ctxt *dlm,
+int ocfs2_dlm_lock(struct dlm_ctxt *dlm,
int mode,
struct dlm_lockstatus *lksb,
u32 flags,
void *name,
unsigned int namelen,
void *astarg);
-enum dlm_status ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
+int ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
struct dlm_lockstatus *lksb,
u32 flags,
void *astarg);
--
1.5.3.8

2008-03-06 00:39:34

by Joel Becker

[permalink] [raw]
Subject: [PATCH 10/18] ocfs2: handle async EAGAIN from NOQUEUE request

From: David Teigland <[email protected]>

This is the stack-glue equivalent of what I had on the dlm_eviction_cb
branch. When using fsdlm, -EAGAIN is returned in the async callback for
NOQUEUE requests.

Signed-off-by: David Teigland <[email protected]>
Signed-off-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/dlmglue.c | 27 +++++++++++++++++++++++----
1 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 99f3de1..d48163f 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -880,13 +880,20 @@ static void ocfs2_locking_ast(void *opaque)
struct ocfs2_lock_res *lockres = opaque;
struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
unsigned long flags;
+ int status;

spin_lock_irqsave(&lockres->l_lock, flags);

- if (ocfs2_dlm_lock_status(&lockres->l_lksb)) {
+ status = ocfs2_dlm_lock_status(&lockres->l_lksb);
+
+ if (status == -EAGAIN) {
+ lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
+ goto out;
+ }
+
+ if (status) {
mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
- lockres->l_name,
- ocfs2_dlm_lock_status(&lockres->l_lksb));
+ lockres->l_name, status);
spin_unlock_irqrestore(&lockres->l_lock, flags);
return;
}
@@ -909,7 +916,7 @@ static void ocfs2_locking_ast(void *opaque)
lockres->l_unlock_action);
BUG();
}
-
+out:
/* set it to something invalid so if we get called again we
* can catch it. */
lockres->l_action = OCFS2_AST_INVALID;
@@ -1113,6 +1120,7 @@ static int ocfs2_cluster_lock(struct ocfs2_super *osb,
int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
unsigned long flags;
unsigned int gen;
+ int noqueue_attempted = 0;

mlog_entry_void();

@@ -1157,6 +1165,13 @@ again:
}

if (level > lockres->l_level) {
+ if (noqueue_attempted > 0) {
+ ret = -EAGAIN;
+ goto unlock;
+ }
+ if (lkm_flags & DLM_LKF_NOQUEUE)
+ noqueue_attempted = 1;
+
if (lockres->l_action != OCFS2_AST_INVALID)
mlog(ML_ERROR, "lockres %s has action %u pending\n",
lockres->l_name, lockres->l_action);
@@ -1621,6 +1636,10 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
* to just bubble sucess back up to the user.
*/
ret = ocfs2_flock_handle_signal(lockres, level);
+ } else if (!ret && (level > lockres->l_level)) {
+ /* Trylock failed asynchronously */
+ BUG_ON(!trylock);
+ ret = -EAGAIN;
}

out:
--
1.5.3.8

2008-03-06 00:40:57

by Joel Becker

[permalink] [raw]
Subject: [PATCH 09/18] ocfs2: Remove CANCELGRANT from the view of dlmglue.

o2dlm has the non-standard behavior of providing a cancel callback
(unlock_ast) even when the cancel has failed (the locking operation
succeeded without canceling). This is called CANCELGRANT after the
status code sent to the callback. fs/dlm does not provide this
callback, so dlmglue must be changed to live without it.
o2dlm_unlock_ast_wrapper() in stackglue now ignores CANCELGRANT calls.

Because dlmglue no longer sees CANCELGRANT, ocfs2_unlock_ast() no longer
needs to check for it. ocfs2_locking_ast() must catch that a cancel was
tried and clear the cancel state.

Making these changes opens up a locking race. dlmglue uses the the
OCFS2_LOCK_BUSY flag to ensure only one thread is calling the dlm at any
one time. But dlmglue must unlock the lockres before calling into the
dlm. In the small window of time between unlocking the lockres and
calling the dlm, the downconvert thread can try to cancel the lock. The
downconvert thread is checking the OCFS2_LOCK_BUSY flag - it doesn't
know that ocfs2_dlm_lock() has not yet been called.

Because ocfs2_dlm_lock() has not yet been called, the cancel operation
will just be a no-op. There's nothing to cancel. With CANCELGRANT,
dlmglue uses the CANCELGRANT callback to clear up the cancel state.
When it comes around again, it will retry the cancel. Eventually, the
first thread will have called into ocfs2_dlm_lock(), and either the
lock or the cancel will succeed. The downconvert thread can then do its
downconvert.

Without CANCELGRANT, there is nothing to clean up the cancellation
state. The downconvert thread does not know to retry its operations.
More importantly, the original lock may be blocking on the other node
that is trying to cancel us. With neither able to make progress, the
ast is never called and the cancellation state is never cleaned up that
way. dlmglue is deadlocked.

The OCFS2_LOCK_PENDING flag is introduced to remedy this window. It is
set at the same time OCFS2_LOCK_BUSY is. Thus, the downconvert thread
can check whether the lock is cancelable. If not, it just loops around
to try again. Once ocfs2_dlm_lock() is called, the thread then clears
OCFS2_LOCK_PENDING and wakes the downconvert thread. Now, if the
downconvert thread finds the lock BUSY, it can safely try to cancel it.
Whether the cancel works or not, the state will be properly set and the
lock processing can continue.

Signed-off-by: Joel Becker <[email protected]>
Signed-off-by: Mark Fasheh <[email protected]>
---
fs/ocfs2/dlmglue.c | 199 +++++++++++++++++++++++++++++++++++++++++++-------
fs/ocfs2/ocfs2.h | 4 +
fs/ocfs2/stackglue.c | 40 +++-------
3 files changed, 188 insertions(+), 55 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 9876857..99f3de1 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -311,12 +311,13 @@ static int ocfs2_inode_lock_update(struct inode *inode,
struct buffer_head **bh);
static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
static inline int ocfs2_highest_compat_lock_level(int level);
-static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
- int new_level);
+static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
+ int new_level);
static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int new_level,
- int lvb);
+ int lvb,
+ unsigned int generation);
static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres);
static int ocfs2_cancel_convert(struct ocfs2_super *osb,
@@ -736,6 +737,113 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
return needs_downconvert;
}

+/*
+ * OCFS2_LOCK_PENDING and l_pending_gen.
+ *
+ * Why does OCFS2_LOCK_PENDING exist? To close a race between setting
+ * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock()
+ * for more details on the race.
+ *
+ * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces
+ * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock()
+ * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear
+ * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns,
+ * the caller is going to try to clear PENDING again. If nothing else is
+ * happening, __lockres_clear_pending() sees PENDING is unset and does
+ * nothing.
+ *
+ * But what if another path (eg downconvert thread) has just started a
+ * new locking action? The other path has re-set PENDING. Our path
+ * cannot clear PENDING, because that will re-open the original race
+ * window.
+ *
+ * [Example]
+ *
+ * ocfs2_meta_lock()
+ * ocfs2_cluster_lock()
+ * set BUSY
+ * set PENDING
+ * drop l_lock
+ * ocfs2_dlm_lock()
+ * ocfs2_locking_ast() ocfs2_downconvert_thread()
+ * clear PENDING ocfs2_unblock_lock()
+ * take_l_lock
+ * !BUSY
+ * ocfs2_prepare_downconvert()
+ * set BUSY
+ * set PENDING
+ * drop l_lock
+ * take l_lock
+ * clear PENDING
+ * drop l_lock
+ * <window>
+ * ocfs2_dlm_lock()
+ *
+ * So as you can see, we now have a window where l_lock is not held,
+ * PENDING is not set, and ocfs2_dlm_lock() has not been called.
+ *
+ * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
+ * set by ocfs2_prepare_downconvert(). That wasn't nice.
+ *
+ * To solve this we introduce l_pending_gen. A call to
+ * lockres_clear_pending() will only do so when it is passed a generation
+ * number that matches the lockres. lockres_set_pending() will return the
+ * current generation number. When ocfs2_cluster_lock() goes to clear
+ * PENDING, it passes the generation it got from set_pending(). In our
+ * example above, the generation numbers will *not* match. Thus,
+ * ocfs2_cluster_lock() will not clear the PENDING set by
+ * ocfs2_prepare_downconvert().
+ */
+
+/* Unlocked version for ocfs2_locking_ast() */
+static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
+ unsigned int generation,
+ struct ocfs2_super *osb)
+{
+ assert_spin_locked(&lockres->l_lock);
+
+ /*
+ * The ast and locking functions can race us here. The winner
+ * will clear pending, the loser will not.
+ */
+ if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
+ (lockres->l_pending_gen != generation))
+ return;
+
+ lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
+ lockres->l_pending_gen++;
+
+ /*
+ * The downconvert thread may have skipped us because we
+ * were PENDING. Wake it up.
+ */
+ if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
+ ocfs2_wake_downconvert_thread(osb);
+}
+
+/* Locked version for callers of ocfs2_dlm_lock() */
+static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
+ unsigned int generation,
+ struct ocfs2_super *osb)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ __lockres_clear_pending(lockres, generation, osb);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+}
+
+static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
+{
+ assert_spin_locked(&lockres->l_lock);
+ BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
+
+ lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
+
+ return lockres->l_pending_gen;
+}
+
+
static void ocfs2_blocking_ast(void *opaque, int level)
{
struct ocfs2_lock_res *lockres = opaque;
@@ -770,6 +878,7 @@ static void ocfs2_blocking_ast(void *opaque, int level)
static void ocfs2_locking_ast(void *opaque)
{
struct ocfs2_lock_res *lockres = opaque;
+ struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
unsigned long flags;

spin_lock_irqsave(&lockres->l_lock, flags);
@@ -805,6 +914,18 @@ static void ocfs2_locking_ast(void *opaque)
* can catch it. */
lockres->l_action = OCFS2_AST_INVALID;

+ /* Did we try to cancel this lock? Clear that state */
+ if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
+ lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
+
+ /*
+ * We may have beaten the locking functions here. We certainly
+ * know that dlm_lock() has been called :-)
+ * Because we can't have two lock calls in flight at once, we
+ * can use lockres->l_pending_gen.
+ */
+ __lockres_clear_pending(lockres, lockres->l_pending_gen, osb);
+
wake_up(&lockres->l_event);
spin_unlock_irqrestore(&lockres->l_lock, flags);
}
@@ -838,6 +959,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
{
int ret = 0;
unsigned long flags;
+ unsigned int gen;

mlog_entry_void();

@@ -854,6 +976,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
lockres->l_action = OCFS2_AST_ATTACH;
lockres->l_requested = level;
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+ gen = lockres_set_pending(lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);

ret = ocfs2_dlm_lock(osb->cconn,
@@ -863,6 +986,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
lockres->l_name,
OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
+ lockres_clear_pending(lockres, gen, osb);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
ocfs2_recover_from_dlm_error(lockres, 1);
@@ -988,6 +1112,7 @@ static int ocfs2_cluster_lock(struct ocfs2_super *osb,
int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
unsigned long flags;
+ unsigned int gen;

mlog_entry_void();

@@ -1046,6 +1171,7 @@ again:

lockres->l_requested = level;
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+ gen = lockres_set_pending(lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);

BUG_ON(level == DLM_LOCK_IV);
@@ -1062,6 +1188,7 @@ again:
lockres->l_name,
OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
+ lockres_clear_pending(lockres, gen, osb);
if (ret) {
if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
(ret != -EAGAIN)) {
@@ -1506,6 +1633,7 @@ out:
void ocfs2_file_unlock(struct file *file)
{
int ret;
+ unsigned int gen;
unsigned long flags;
struct ocfs2_file_private *fp = file->private_data;
struct ocfs2_lock_res *lockres = &fp->fp_flock;
@@ -1531,11 +1659,11 @@ void ocfs2_file_unlock(struct file *file)
lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
lockres->l_blocking = DLM_LOCK_EX;

- ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
+ gen = ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
spin_unlock_irqrestore(&lockres->l_lock, flags);

- ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0);
+ ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0, gen);
if (ret) {
mlog_errno(ret);
return;
@@ -2555,23 +2683,7 @@ static void ocfs2_unlock_ast(void *opaque, int error)
lockres->l_unlock_action);

spin_lock_irqsave(&lockres->l_lock, flags);
- /* We tried to cancel a convert request, but it was already
- * granted. All we want to do here is clear our unlock
- * state. The wake_up call done at the bottom is redundant
- * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
- * hurt anything anyway */
- if (error == -DLM_ECANCEL &&
- lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
- mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
-
- /* We don't clear the busy flag in this case as it
- * should have been cleared by the ast which the dlm
- * has called. */
- goto complete_unlock;
- }
-
- /* DLM_EUNLOCK is the success code for unlock */
- if (error != -DLM_EUNLOCK) {
+ if (error) {
mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
"unlock_action %d\n", error, lockres->l_name,
lockres->l_unlock_action);
@@ -2592,7 +2704,6 @@ static void ocfs2_unlock_ast(void *opaque, int error)
}

lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
-complete_unlock:
lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
spin_unlock_irqrestore(&lockres->l_lock, flags);

@@ -2768,8 +2879,8 @@ int ocfs2_drop_inode_locks(struct inode *inode)
return status;
}

-static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
- int new_level)
+static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
+ int new_level)
{
assert_spin_locked(&lockres->l_lock);

@@ -2787,12 +2898,14 @@ static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
lockres->l_action = OCFS2_AST_DOWNCONVERT;
lockres->l_requested = new_level;
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+ return lockres_set_pending(lockres);
}

static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int new_level,
- int lvb)
+ int lvb,
+ unsigned int generation)
{
int ret;
u32 dlm_flags = DLM_LKF_CONVERT;
@@ -2809,6 +2922,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
lockres->l_name,
OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
+ lockres_clear_pending(lockres, generation, osb);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
ocfs2_recover_from_dlm_error(lockres, 1);
@@ -2883,6 +2997,7 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
int new_level;
int ret = 0;
int set_lvb = 0;
+ unsigned int gen;

mlog_entry_void();

@@ -2892,6 +3007,32 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,

recheck:
if (lockres->l_flags & OCFS2_LOCK_BUSY) {
+ /* XXX
+ * This is a *big* race. The OCFS2_LOCK_PENDING flag
+ * exists entirely for one reason - another thread has set
+ * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
+ *
+ * If we do ocfs2_cancel_convert() before the other thread
+ * calls dlm_lock(), our cancel will do nothing. We will
+ * get no ast, and we will have no way of knowing the
+ * cancel failed. Meanwhile, the other thread will call
+ * into dlm_lock() and wait...forever.
+ *
+ * Why forever? Because another node has asked for the
+ * lock first; that's why we're here in unblock_lock().
+ *
+ * The solution is OCFS2_LOCK_PENDING. When PENDING is
+ * set, we just requeue the unblock. Only when the other
+ * thread has called dlm_lock() and cleared PENDING will
+ * we then cancel their request.
+ *
+ * All callers of dlm_lock() must set OCFS2_DLM_PENDING
+ * at the same time they set OCFS2_DLM_BUSY. They must
+ * clear OCFS2_DLM_PENDING after dlm_lock() returns.
+ */
+ if (lockres->l_flags & OCFS2_LOCK_PENDING)
+ goto leave_requeue;
+
ctl->requeue = 1;
ret = ocfs2_prepare_cancel_convert(osb, lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);
@@ -2971,9 +3112,11 @@ downconvert:
lockres->l_ops->set_lvb(lockres);
}

- ocfs2_prepare_downconvert(lockres, new_level);
+ gen = ocfs2_prepare_downconvert(lockres, new_level);
spin_unlock_irqrestore(&lockres->l_lock, flags);
- ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
+ ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
+ gen);
+
leave:
mlog_exit(ret);
return ret;
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 31dc28b..af929ec 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -98,6 +98,9 @@ enum ocfs2_unlock_action {
* dropped. */
#define OCFS2_LOCK_QUEUED (0x00000100) /* queued for downconvert */
#define OCFS2_LOCK_NOCACHE (0x00000200) /* don't use a holder count */
+#define OCFS2_LOCK_PENDING (0x00000400) /* This lockres is pending a
+ call to dlm_lock. Only
+ exists with BUSY set. */

struct ocfs2_lock_res_ops;

@@ -124,6 +127,7 @@ struct ocfs2_lock_res {
enum ocfs2_unlock_action l_unlock_action;
int l_requested;
int l_blocking;
+ unsigned int l_pending_gen;

wait_queue_head_t l_event;

diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 670fa94..abdb9f6 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -104,8 +104,8 @@ static int flags_to_o2dlm(u32 flags)
*
* DLM_NORMAL: 0
* DLM_NOTQUEUED: -EAGAIN
- * DLM_CANCELGRANT: -DLM_ECANCEL
- * DLM_CANCEL: -DLM_EUNLOCK
+ * DLM_CANCELGRANT: -EBUSY
+ * DLM_CANCEL: -DLM_ECANCEL
*/
/* Keep in sync with dlmapi.h */
static int status_map[] = {
@@ -113,13 +113,13 @@ static int status_map[] = {
[DLM_GRANTED] = -EINVAL,
[DLM_DENIED] = -EACCES,
[DLM_DENIED_NOLOCKS] = -EACCES,
- [DLM_WORKING] = -EBUSY,
+ [DLM_WORKING] = -EACCES,
[DLM_BLOCKED] = -EINVAL,
[DLM_BLOCKED_ORPHAN] = -EINVAL,
[DLM_DENIED_GRACE_PERIOD] = -EACCES,
[DLM_SYSERR] = -ENOMEM, /* It is what it is */
[DLM_NOSUPPORT] = -EPROTO,
- [DLM_CANCELGRANT] = -DLM_ECANCEL, /* Cancel after grant */
+ [DLM_CANCELGRANT] = -EBUSY, /* Cancel after grant */
[DLM_IVLOCKID] = -EINVAL,
[DLM_SYNC] = -EINVAL,
[DLM_BADTYPE] = -EINVAL,
@@ -137,7 +137,7 @@ static int status_map[] = {
[DLM_VALNOTVALID] = -EINVAL,
[DLM_REJECTED] = -EPERM,
[DLM_ABORT] = -EINVAL,
- [DLM_CANCEL] = -DLM_EUNLOCK, /* Successful cancel */
+ [DLM_CANCEL] = -DLM_ECANCEL, /* Successful cancel */
[DLM_IVRESHANDLE] = -EINVAL,
[DLM_DEADLOCK] = -EDEADLK,
[DLM_DENIED_NOASTS] = -EINVAL,
@@ -152,6 +152,7 @@ static int status_map[] = {
[DLM_MIGRATING] = -ERESTART,
[DLM_MAXSTATS] = -EINVAL,
};
+
static int dlm_status_to_errno(enum dlm_status status)
{
BUG_ON(status > (sizeof(status_map) / sizeof(status_map[0])));
@@ -175,38 +176,23 @@ static void o2dlm_blocking_ast_wrapper(void *astarg, int level)

static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
{
- int error;
+ int error = dlm_status_to_errno(status);

BUG_ON(lproto == NULL);

/*
- * XXX: CANCEL values are sketchy.
- *
- * Currently we have preserved the o2dlm paradigm. You can get
- * unlock_ast() whether the cancel succeded or not.
- *
- * First, we're going to pass DLM_EUNLOCK just like fs/dlm does for
- * successful unlocks. That is a clean behavior.
- *
* In o2dlm, you can get both the lock_ast() for the lock being
* granted and the unlock_ast() for the CANCEL failing. A
* successful cancel sends DLM_NORMAL here. If the
* lock grant happened before the cancel arrived, you get
- * DLM_CANCELGRANT. For now, we'll use DLM_ECANCEL to signify
- * CANCELGRANT - the CANCEL was supposed to happen but didn't. We
- * can then use DLM_EUNLOCK to signify a successful CANCEL -
- * effectively, the CANCEL caused the lock to roll back.
+ * DLM_CANCELGRANT.
*
- * In the future, we will likely move the o2dlm to send only one
- * ast - either unlock_ast() for a successful CANCEL or lock_ast()
- * when the grant succeeds. At that point, we'll send DLM_ECANCEL
- * for all cancel results (CANCELGRANT will no longer exist).
+ * There's no need for the double-ast. If we see DLM_CANCELGRANT,
+ * we just ignore it. We expect the lock_ast() to handle the
+ * granted lock.
*/
- error = dlm_status_to_errno(status);
-
- /* Successful unlock is DLM_EUNLOCK */
- if (!error)
- error = -DLM_EUNLOCK;
+ if (status == DLM_CANCELGRANT)
+ return;

lproto->lp_unlock_ast(astarg, error);
}
--
1.5.3.8