2010-02-10 09:29:30

by Joel Becker

[permalink] [raw]
Subject: [0/11] ocfs2_dlmfs improvements v2

Changes from v1:
- Ignore truncation of dlmfs files. The LVB is a constant size.
- Check the LVB for validity before copying it to userspace. LVBs can
become invalid during lock recovery.
- This has now been tested to pass lvb_torture on two nodes with fsdlm

ocfs2 ships with its own cluster stack, o2cb. The dlm portion, o2dlm,
is accessible to userspace via a custom filesystem, ocfs2_dlmfs. Files
in this filesytem represent cluster lock resources. Open the file, and
you take the lock. The libo2dlm library wraps this filesystem,
providing a very simplified cluster locking API to userspace programs.

One of the things left out of the simplified interface is the ability to
learn that other nodes want a lock. Notification that another node
wants the lock is called a "blocking asynchronous system trap", or
"bast". Kernel users of o2dlm can register basts as they please, but
programs using libo2dlm and ocfs2_dlmfs cannot. Thus, they must not
hold a lock forever; other nodes will never get the lock.

The first improvement in this series adds poll(2) support to
ocfs2_dlmfs. POLLIN on an open ocfs2_dlmfs flie signals a bast. Now
user programs can hold a lock until they are notified to release it.

Another limitation of ocfs2_dlmfs is its reliance on o2dlm and thus the
entire o2cb cluster stack. The ocfs2 filesystem has a glue layer,
called "stackglue", that allows it to switch between o2cb+o2dlm and the
userspace cluster stacks that work with fs/dlm. This means userspace
programs for ocfs2 have to know about libo2dlm when o2dlm is employed
and libdlm when fs/dlm is employed.

The second improvement is to make ocfs2_dlmfs use stackglue. Instead of
directly calling o2dlm APIs, it uses stackglue to remain agnostic of the
cluster stack. Now a system using a userspace cluster stack and fs/dlm
can mount ocfs2_dlmfs and use libo2dlm. This benefits everyone.

Joel

[Pull]
git://git.kernel.org/pub/scm/linux/kernel/git/jlbec/ocfs2.git dlmfs-stackglue
[View]
http://git.kernel.org/?p=linux/kernel/git/jlbec/ocfs2.git;a=shortlog;h=refs/heads/dlmfs-stackglue

fs/ocfs2/Makefile | 1 +
fs/ocfs2/dlm/Makefile | 3 +-
fs/ocfs2/dlmfs/Makefile | 5 +
fs/ocfs2/{dlm => dlmfs}/dlmfs.c | 127 +++++++++++++-----
fs/ocfs2/{dlm => dlmfs}/dlmfsver.c | 0
fs/ocfs2/{dlm => dlmfs}/dlmfsver.h | 0
fs/ocfs2/{dlm => dlmfs}/userdlm.c | 266 ++++++++++++++++++------------------
fs/ocfs2/{dlm => dlmfs}/userdlm.h | 16 +-
fs/ocfs2/dlmglue.c | 198 +++++++++++++--------------
fs/ocfs2/ocfs2.h | 2 +-
fs/ocfs2/ocfs2_lockingver.h | 2 +
fs/ocfs2/stack_o2cb.c | 37 +++---
fs/ocfs2/stack_user.c | 49 +++----
fs/ocfs2/stackglue.c | 98 +++++++++-----
fs/ocfs2/stackglue.h | 95 ++++++++------
15 files changed, 502 insertions(+), 397 deletions(-)



2010-02-10 09:29:26

by Joel Becker

[permalink] [raw]
Subject: [PATCH 03/11] ocfs2_dlmfs: Move to its own directory

We're going to remove the tie between ocfs2_dlmfs and o2dlm.
ocfs2_dlmfs doesn't belong in the fs/ocfs2/dlm directory anymore. Here
we move it to fs/ocfs2/dlmfs.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/Makefile | 1 +
fs/ocfs2/dlm/Makefile | 3 +--
fs/ocfs2/dlmfs/Makefile | 5 +++++
fs/ocfs2/{dlm => dlmfs}/dlmfs.c | 2 +-
fs/ocfs2/{dlm => dlmfs}/dlmfsver.c | 0
fs/ocfs2/{dlm => dlmfs}/dlmfsver.h | 0
fs/ocfs2/{dlm => dlmfs}/userdlm.c | 2 +-
fs/ocfs2/{dlm => dlmfs}/userdlm.h | 0
8 files changed, 9 insertions(+), 4 deletions(-)
create mode 100644 fs/ocfs2/dlmfs/Makefile
rename fs/ocfs2/{dlm => dlmfs}/dlmfs.c (99%)
rename fs/ocfs2/{dlm => dlmfs}/dlmfsver.c (100%)
rename fs/ocfs2/{dlm => dlmfs}/dlmfsver.h (100%)
rename fs/ocfs2/{dlm => dlmfs}/userdlm.c (99%)
rename fs/ocfs2/{dlm => dlmfs}/userdlm.h (100%)

diff --git a/fs/ocfs2/Makefile b/fs/ocfs2/Makefile
index 600d2d2..791c088 100644
--- a/fs/ocfs2/Makefile
+++ b/fs/ocfs2/Makefile
@@ -46,6 +46,7 @@ ocfs2_stackglue-objs := stackglue.o
ocfs2_stack_o2cb-objs := stack_o2cb.o
ocfs2_stack_user-objs := stack_user.o

+obj-$(CONFIG_OCFS2_FS) += dlmfs/
# cluster/ is always needed when OCFS2_FS for masklog support
obj-$(CONFIG_OCFS2_FS) += cluster/
obj-$(CONFIG_OCFS2_FS_O2CB) += dlm/
diff --git a/fs/ocfs2/dlm/Makefile b/fs/ocfs2/dlm/Makefile
index 1903613..dcebf0d 100644
--- a/fs/ocfs2/dlm/Makefile
+++ b/fs/ocfs2/dlm/Makefile
@@ -1,8 +1,7 @@
EXTRA_CFLAGS += -Ifs/ocfs2

-obj-$(CONFIG_OCFS2_FS_O2CB) += ocfs2_dlm.o ocfs2_dlmfs.o
+obj-$(CONFIG_OCFS2_FS_O2CB) += ocfs2_dlm.o

ocfs2_dlm-objs := dlmdomain.o dlmdebug.o dlmthread.o dlmrecovery.o \
dlmmaster.o dlmast.o dlmconvert.o dlmlock.o dlmunlock.o dlmver.o

-ocfs2_dlmfs-objs := userdlm.o dlmfs.o dlmfsver.o
diff --git a/fs/ocfs2/dlmfs/Makefile b/fs/ocfs2/dlmfs/Makefile
new file mode 100644
index 0000000..df69b48
--- /dev/null
+++ b/fs/ocfs2/dlmfs/Makefile
@@ -0,0 +1,5 @@
+EXTRA_CFLAGS += -Ifs/ocfs2
+
+obj-$(CONFIG_OCFS2_FS) += ocfs2_dlmfs.o
+
+ocfs2_dlmfs-objs := userdlm.o dlmfs.o dlmfsver.o
diff --git a/fs/ocfs2/dlm/dlmfs.c b/fs/ocfs2/dlmfs/dlmfs.c
similarity index 99%
rename from fs/ocfs2/dlm/dlmfs.c
rename to fs/ocfs2/dlmfs/dlmfs.c
index ddf55da..e21ce0e 100644
--- a/fs/ocfs2/dlm/dlmfs.c
+++ b/fs/ocfs2/dlmfs/dlmfs.c
@@ -52,7 +52,7 @@
#include "cluster/heartbeat.h"
#include "cluster/tcp.h"

-#include "dlmapi.h"
+#include "dlm/dlmapi.h"

#include "userdlm.h"

diff --git a/fs/ocfs2/dlm/dlmfsver.c b/fs/ocfs2/dlmfs/dlmfsver.c
similarity index 100%
rename from fs/ocfs2/dlm/dlmfsver.c
rename to fs/ocfs2/dlmfs/dlmfsver.c
diff --git a/fs/ocfs2/dlm/dlmfsver.h b/fs/ocfs2/dlmfs/dlmfsver.h
similarity index 100%
rename from fs/ocfs2/dlm/dlmfsver.h
rename to fs/ocfs2/dlmfs/dlmfsver.h
diff --git a/fs/ocfs2/dlm/userdlm.c b/fs/ocfs2/dlmfs/userdlm.c
similarity index 99%
rename from fs/ocfs2/dlm/userdlm.c
rename to fs/ocfs2/dlmfs/userdlm.c
index 4cb1d3d..6adae70 100644
--- a/fs/ocfs2/dlm/userdlm.c
+++ b/fs/ocfs2/dlmfs/userdlm.c
@@ -39,7 +39,7 @@
#include "cluster/heartbeat.h"
#include "cluster/tcp.h"

-#include "dlmapi.h"
+#include "dlm/dlmapi.h"

#include "userdlm.h"

diff --git a/fs/ocfs2/dlm/userdlm.h b/fs/ocfs2/dlmfs/userdlm.h
similarity index 100%
rename from fs/ocfs2/dlm/userdlm.h
rename to fs/ocfs2/dlmfs/userdlm.h
--
1.6.6.1

2010-02-10 09:29:34

by Joel Becker

[permalink] [raw]
Subject: [PATCH 07/11] ocfs2: Remove the ast pointers from ocfs2_stack_plugins

With the full ocfs2_locking_protocol hanging off of the
ocfs2_cluster_connection, ast wrappers can get the ast/bast pointers
there. They don't need to get them from their plugin structure.

The user plugin still needs the maximum locking protocol version,
though. This changes the plugin structure so that it only holds the max
version, not the entire ocfs2_locking_protocol pointer.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/stack_user.c | 6 +++---
fs/ocfs2/stackglue.c | 4 ++--
fs/ocfs2/stackglue.h | 2 +-
3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index b4cf616..5ae8812 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -62,8 +62,8 @@
* negotiated by the client. The client negotiates based on the maximum
* version advertised in /sys/fs/ocfs2/max_locking_protocol. The major
* number from the "SETV" message must match
- * ocfs2_user_plugin.sp_proto->lp_max_version.pv_major, and the minor number
- * must be less than or equal to ...->lp_max_version.pv_minor.
+ * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
+ * must be less than or equal to ...sp_max_version.pv_minor.
*
* Once this information has been set, mounts will be allowed. From this
* point on, the "DOWN" message can be sent for node down notification.
@@ -400,7 +400,7 @@ static int ocfs2_control_do_setversion_msg(struct file *file,
char *ptr = NULL;
struct ocfs2_control_private *p = file->private_data;
struct ocfs2_protocol_version *max =
- &ocfs2_user_plugin.sp_proto->lp_max_version;
+ &ocfs2_user_plugin.sp_max_proto;

if (ocfs2_control_get_handshake_state(file) !=
OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 010ecab..fc184c7 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -176,7 +176,7 @@ int ocfs2_stack_glue_register(struct ocfs2_stack_plugin *plugin)
spin_lock(&ocfs2_stack_lock);
if (!ocfs2_stack_lookup(plugin->sp_name)) {
plugin->sp_count = 0;
- plugin->sp_proto = lproto;
+ plugin->sp_max_proto = lproto->lp_max_version;
list_add(&plugin->sp_list, &ocfs2_stack_list);
printk(KERN_INFO "ocfs2: Registered cluster interface %s\n",
plugin->sp_name);
@@ -224,7 +224,7 @@ void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto)

lproto = proto;
list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
- p->sp_proto = lproto;
+ p->sp_max_proto = lproto->lp_max_version;
}

spin_unlock(&ocfs2_stack_lock);
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index cf8bac2..77a7a9a 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -233,7 +233,7 @@ struct ocfs2_stack_plugin {
/* These are managed by the stackglue code. */
struct list_head sp_list;
unsigned int sp_count;
- struct ocfs2_locking_protocol *sp_proto;
+ struct ocfs2_protocol_version sp_max_proto;
};


--
1.6.6.1

2010-02-10 09:29:52

by Joel Becker

[permalink] [raw]
Subject: [PATCH 06/11] ocfs2: Hang the locking proto on the cluster conn and use it in asts.

With the ocfs2_cluster_connection hanging off of the ocfs2_dlm_lksb, we
have access to it in the ast and bast wrapper functions. Attach the
ocfs2_locking_protocol to the conn.

Now, instead of refering to a static variable for ast/bast pointers, the
wrappers can look at the connection. This means different connections
can have different ast/bast pointers, and it reduces the need for the
static pointer.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/stack_o2cb.c | 15 ++++-----------
fs/ocfs2/stack_user.c | 10 +++-------
fs/ocfs2/stackglue.c | 1 +
fs/ocfs2/stackglue.h | 1 +
4 files changed, 9 insertions(+), 18 deletions(-)

diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c
index 41e6bad..abf696a 100644
--- a/fs/ocfs2/stack_o2cb.c
+++ b/fs/ocfs2/stack_o2cb.c
@@ -163,28 +163,21 @@ static void o2dlm_lock_ast_wrapper(void *astarg)
{
struct ocfs2_dlm_lksb *lksb = astarg;

- BUG_ON(o2cb_stack.sp_proto == NULL);
-
- o2cb_stack.sp_proto->lp_lock_ast(lksb);
+ lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
}

static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
{
struct ocfs2_dlm_lksb *lksb = astarg;

- BUG_ON(o2cb_stack.sp_proto == NULL);
-
- o2cb_stack.sp_proto->lp_blocking_ast(lksb, level);
+ lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
}

static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
{
struct ocfs2_dlm_lksb *lksb = astarg;
-
int error = dlm_status_to_errno(status);

- BUG_ON(o2cb_stack.sp_proto == NULL);
-
/*
* In o2dlm, you can get both the lock_ast() for the lock being
* granted and the unlock_ast() for the CANCEL failing. A
@@ -199,7 +192,7 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
if (status == DLM_CANCELGRANT)
return;

- o2cb_stack.sp_proto->lp_unlock_ast(lksb, error);
+ lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, error);
}

static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
@@ -284,7 +277,7 @@ static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn)
struct dlm_protocol_version dlm_version;

BUG_ON(conn == NULL);
- BUG_ON(o2cb_stack.sp_proto == NULL);
+ BUG_ON(conn->cc_proto == NULL);

/* for now we only have one cluster/node, make sure we see it
* in the heartbeat universe */
diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index 31276ba..b4cf616 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -668,8 +668,6 @@ static void fsdlm_lock_ast_wrapper(void *astarg)
struct ocfs2_dlm_lksb *lksb = astarg;
int status = lksb->lksb_fsdlm.sb_status;

- BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
-
/*
* For now we're punting on the issue of other non-standard errors
* where we can't tell if the unlock_ast or lock_ast should be called.
@@ -681,18 +679,16 @@ static void fsdlm_lock_ast_wrapper(void *astarg)
*/

if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
- ocfs2_user_plugin.sp_proto->lp_unlock_ast(lksb, 0);
+ lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
else
- ocfs2_user_plugin.sp_proto->lp_lock_ast(lksb);
+ lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
}

static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
{
struct ocfs2_dlm_lksb *lksb = astarg;

- BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
-
- ocfs2_user_plugin.sp_proto->lp_blocking_ast(lksb, level);
+ lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
}

static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 8ef9a57..010ecab 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -343,6 +343,7 @@ int ocfs2_cluster_connect(const char *stack_name,
new_conn->cc_recovery_handler = recovery_handler;
new_conn->cc_recovery_data = recovery_data;

+ new_conn->cc_proto = lproto;
/* Start the new connection at our maximum compatibility level */
new_conn->cc_version = lproto->lp_max_version;

diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index bb32926..cf8bac2 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -100,6 +100,7 @@ struct ocfs2_cluster_connection {
char cc_name[GROUP_NAME_MAX];
int cc_namelen;
struct ocfs2_protocol_version cc_version;
+ struct ocfs2_locking_protocol *cc_proto;
void (*cc_recovery_handler)(int node_num, void *recovery_data);
void *cc_recovery_data;
void *cc_lockspace;
--
1.6.6.1

2010-02-10 09:31:30

by Joel Becker

[permalink] [raw]
Subject: [PATCH 08/11] ocfs2: Pass the locking protocol into ocfs2_cluster_connect().

Inside the stackglue, the locking protocol structure is hanging off of
the ocfs2_cluster_connection. This takes it one further; the locking
protocol is passed into ocfs2_cluster_connect(). Now different cluster
connections can have different locking protocols with distinct asts.
Note that all locking protocols have to keep their maximum protocol
version in lock-step.

With the protocol structure set in ocfs2_cluster_connect(), there is no
need for the stackglue to have a static pointer to a specific protocol
structure. We can change initialization to only pass in the maximum
protocol version.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/dlmglue.c | 168 +++++++++++++++++++++++++-------------------------
fs/ocfs2/stackglue.c | 43 ++++++++-----
fs/ocfs2/stackglue.h | 3 +-
3 files changed, 110 insertions(+), 104 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 4cb3ac2..dde55c4 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -1036,7 +1036,6 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
return lockres->l_pending_gen;
}

-
static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
{
struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
@@ -1130,6 +1129,88 @@ out:
spin_unlock_irqrestore(&lockres->l_lock, flags);
}

+static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
+{
+ struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
+ unsigned long flags;
+
+ mlog_entry_void();
+
+ mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
+ lockres->l_unlock_action);
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ if (error) {
+ mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
+ "unlock_action %d\n", error, lockres->l_name,
+ lockres->l_unlock_action);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+ mlog_exit_void();
+ return;
+ }
+
+ switch(lockres->l_unlock_action) {
+ case OCFS2_UNLOCK_CANCEL_CONVERT:
+ mlog(0, "Cancel convert success for %s\n", lockres->l_name);
+ lockres->l_action = OCFS2_AST_INVALID;
+ /* Downconvert thread may have requeued this lock, we
+ * need to wake it. */
+ if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
+ ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
+ break;
+ case OCFS2_UNLOCK_DROP_LOCK:
+ lockres->l_level = DLM_LOCK_IV;
+ break;
+ default:
+ BUG();
+ }
+
+ lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
+ lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
+ wake_up(&lockres->l_event);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ mlog_exit_void();
+}
+
+/*
+ * This is the filesystem locking protocol. It provides the lock handling
+ * hooks for the underlying DLM. It has a maximum version number.
+ * The version number allows interoperability with systems running at
+ * the same major number and an equal or smaller minor number.
+ *
+ * Whenever the filesystem does new things with locks (adds or removes a
+ * lock, orders them differently, does different things underneath a lock),
+ * the version must be changed. The protocol is negotiated when joining
+ * the dlm domain. A node may join the domain if its major version is
+ * identical to all other nodes and its minor version is greater than
+ * or equal to all other nodes. When its minor version is greater than
+ * the other nodes, it will run at the minor version specified by the
+ * other nodes.
+ *
+ * If a locking change is made that will not be compatible with older
+ * versions, the major number must be increased and the minor version set
+ * to zero. If a change merely adds a behavior that can be disabled when
+ * speaking to older versions, the minor version must be increased. If a
+ * change adds a fully backwards compatible change (eg, LVB changes that
+ * are just ignored by older versions), the version does not need to be
+ * updated.
+ */
+static struct ocfs2_locking_protocol lproto = {
+ .lp_max_version = {
+ .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
+ .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
+ },
+ .lp_lock_ast = ocfs2_locking_ast,
+ .lp_blocking_ast = ocfs2_blocking_ast,
+ .lp_unlock_ast = ocfs2_unlock_ast,
+};
+
+void ocfs2_set_locking_protocol(void)
+{
+ ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
+}
+
static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
int convert)
{
@@ -2959,7 +3040,7 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
status = ocfs2_cluster_connect(osb->osb_cluster_stack,
osb->uuid_str,
strlen(osb->uuid_str),
- ocfs2_do_node_down, osb,
+ &lproto, ocfs2_do_node_down, osb,
&conn);
if (status) {
mlog_errno(status);
@@ -3026,50 +3107,6 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
mlog_exit_void();
}

-static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
-{
- struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
- unsigned long flags;
-
- mlog_entry_void();
-
- mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
- lockres->l_unlock_action);
-
- spin_lock_irqsave(&lockres->l_lock, flags);
- if (error) {
- mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
- "unlock_action %d\n", error, lockres->l_name,
- lockres->l_unlock_action);
- spin_unlock_irqrestore(&lockres->l_lock, flags);
- mlog_exit_void();
- return;
- }
-
- switch(lockres->l_unlock_action) {
- case OCFS2_UNLOCK_CANCEL_CONVERT:
- mlog(0, "Cancel convert success for %s\n", lockres->l_name);
- lockres->l_action = OCFS2_AST_INVALID;
- /* Downconvert thread may have requeued this lock, we
- * need to wake it. */
- if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
- ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
- break;
- case OCFS2_UNLOCK_DROP_LOCK:
- lockres->l_level = DLM_LOCK_IV;
- break;
- default:
- BUG();
- }
-
- lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
- lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
- wake_up(&lockres->l_event);
- spin_unlock_irqrestore(&lockres->l_lock, flags);
-
- mlog_exit_void();
-}
-
static int ocfs2_drop_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
{
@@ -3843,45 +3880,6 @@ void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
ocfs2_cluster_unlock(osb, lockres, level);
}

-/*
- * This is the filesystem locking protocol. It provides the lock handling
- * hooks for the underlying DLM. It has a maximum version number.
- * The version number allows interoperability with systems running at
- * the same major number and an equal or smaller minor number.
- *
- * Whenever the filesystem does new things with locks (adds or removes a
- * lock, orders them differently, does different things underneath a lock),
- * the version must be changed. The protocol is negotiated when joining
- * the dlm domain. A node may join the domain if its major version is
- * identical to all other nodes and its minor version is greater than
- * or equal to all other nodes. When its minor version is greater than
- * the other nodes, it will run at the minor version specified by the
- * other nodes.
- *
- * If a locking change is made that will not be compatible with older
- * versions, the major number must be increased and the minor version set
- * to zero. If a change merely adds a behavior that can be disabled when
- * speaking to older versions, the minor version must be increased. If a
- * change adds a fully backwards compatible change (eg, LVB changes that
- * are just ignored by older versions), the version does not need to be
- * updated.
- */
-static struct ocfs2_locking_protocol lproto = {
- .lp_max_version = {
- .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
- .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
- },
- .lp_lock_ast = ocfs2_locking_ast,
- .lp_blocking_ast = ocfs2_blocking_ast,
- .lp_unlock_ast = ocfs2_unlock_ast,
-};
-
-void ocfs2_set_locking_protocol(void)
-{
- ocfs2_stack_glue_set_locking_protocol(&lproto);
-}
-
-
static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
{
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index fc184c7..31db2e8 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -36,7 +36,7 @@
#define OCFS2_STACK_PLUGIN_USER "user"
#define OCFS2_MAX_HB_CTL_PATH 256

-static struct ocfs2_locking_protocol *lproto;
+static struct ocfs2_protocol_version locking_max_version;
static DEFINE_SPINLOCK(ocfs2_stack_lock);
static LIST_HEAD(ocfs2_stack_list);
static char cluster_stack_name[OCFS2_STACK_LABEL_LEN + 1];
@@ -176,7 +176,7 @@ int ocfs2_stack_glue_register(struct ocfs2_stack_plugin *plugin)
spin_lock(&ocfs2_stack_lock);
if (!ocfs2_stack_lookup(plugin->sp_name)) {
plugin->sp_count = 0;
- plugin->sp_max_proto = lproto->lp_max_version;
+ plugin->sp_max_proto = locking_max_version;
list_add(&plugin->sp_list, &ocfs2_stack_list);
printk(KERN_INFO "ocfs2: Registered cluster interface %s\n",
plugin->sp_name);
@@ -213,23 +213,23 @@ void ocfs2_stack_glue_unregister(struct ocfs2_stack_plugin *plugin)
}
EXPORT_SYMBOL_GPL(ocfs2_stack_glue_unregister);

-void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto)
+void ocfs2_stack_glue_set_max_proto_version(struct ocfs2_protocol_version *max_proto)
{
struct ocfs2_stack_plugin *p;

- BUG_ON(proto == NULL);
-
spin_lock(&ocfs2_stack_lock);
- BUG_ON(active_stack != NULL);
+ if (memcmp(max_proto, &locking_max_version,
+ sizeof(struct ocfs2_protocol_version))) {
+ BUG_ON(locking_max_version.pv_major != 0);

- lproto = proto;
- list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
- p->sp_max_proto = lproto->lp_max_version;
+ locking_max_version = *max_proto;
+ list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
+ p->sp_max_proto = locking_max_version;
+ }
}
-
spin_unlock(&ocfs2_stack_lock);
}
-EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_locking_protocol);
+EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_max_proto_version);


/*
@@ -245,8 +245,6 @@ int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
void *name,
unsigned int namelen)
{
- BUG_ON(lproto == NULL);
-
if (!lksb->lksb_conn)
lksb->lksb_conn = conn;
else
@@ -260,7 +258,6 @@ int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
struct ocfs2_dlm_lksb *lksb,
u32 flags)
{
- BUG_ON(lproto == NULL);
BUG_ON(lksb->lksb_conn == NULL);

return active_stack->sp_ops->dlm_unlock(conn, lksb, flags);
@@ -314,6 +311,7 @@ EXPORT_SYMBOL_GPL(ocfs2_plock);
int ocfs2_cluster_connect(const char *stack_name,
const char *group,
int grouplen,
+ struct ocfs2_locking_protocol *lproto,
void (*recovery_handler)(int node_num,
void *recovery_data),
void *recovery_data,
@@ -331,6 +329,12 @@ int ocfs2_cluster_connect(const char *stack_name,
goto out;
}

+ if (memcmp(&lproto->lp_max_version, &locking_max_version,
+ sizeof(struct ocfs2_protocol_version))) {
+ rc = -EINVAL;
+ goto out;
+ }
+
new_conn = kzalloc(sizeof(struct ocfs2_cluster_connection),
GFP_KERNEL);
if (!new_conn) {
@@ -456,10 +460,10 @@ static ssize_t ocfs2_max_locking_protocol_show(struct kobject *kobj,
ssize_t ret = 0;

spin_lock(&ocfs2_stack_lock);
- if (lproto)
+ if (locking_max_version.pv_major)
ret = snprintf(buf, PAGE_SIZE, "%u.%u\n",
- lproto->lp_max_version.pv_major,
- lproto->lp_max_version.pv_minor);
+ locking_max_version.pv_major,
+ locking_max_version.pv_minor);
spin_unlock(&ocfs2_stack_lock);

return ret;
@@ -688,7 +692,10 @@ static int __init ocfs2_stack_glue_init(void)

static void __exit ocfs2_stack_glue_exit(void)
{
- lproto = NULL;
+ memset(&locking_max_version, 0,
+ sizeof(struct ocfs2_protocol_version));
+ locking_max_version.pv_major = 0;
+ locking_max_version.pv_minor = 0;
ocfs2_sysfs_exit();
if (ocfs2_table_header)
unregister_sysctl_table(ocfs2_table_header);
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index 77a7a9a..b1981ba 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -241,6 +241,7 @@ struct ocfs2_stack_plugin {
int ocfs2_cluster_connect(const char *stack_name,
const char *group,
int grouplen,
+ struct ocfs2_locking_protocol *lproto,
void (*recovery_handler)(int node_num,
void *recovery_data),
void *recovery_data,
@@ -270,7 +271,7 @@ int ocfs2_stack_supports_plocks(void);
int ocfs2_plock(struct ocfs2_cluster_connection *conn, u64 ino,
struct file *file, int cmd, struct file_lock *fl);

-void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto);
+void ocfs2_stack_glue_set_max_proto_version(struct ocfs2_protocol_version *max_proto);


/* Used by stack plugins */
--
1.6.6.1

2010-02-10 09:30:20

by Joel Becker

[permalink] [raw]
Subject: [PATCH 04/11] ocfs2: Pass lksbs back from stackglue ast/bast functions.

The stackglue ast and bast functions tried to maintain the fiction that
their arguments were void pointers. In reality, stack_user.c had to
know that the argument was an ocfs2_lock_res in order to get the status
off of the lksb. That's ugly.

This changes stackglue to always pass the lksb as the argument to ast
and bast functions. The caller can always use container_of() to get the
ocfs2_lock_res or user_dlm_lock_res. The net effect to the caller is
zero. They still get back the lockres in their ast. stackglue gets
cleaner, and now can use the lksb itself.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/dlmglue.c | 34 +++++++++++++++++-----------------
fs/ocfs2/stack_o2cb.c | 22 +++++++++++++---------
fs/ocfs2/stack_user.c | 29 +++++++++++------------------
fs/ocfs2/stackglue.c | 19 ++++++++-----------
fs/ocfs2/stackglue.h | 42 +++++++++++++++++++++---------------------
5 files changed, 70 insertions(+), 76 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index c5e4a49..28df5f7 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -297,6 +297,11 @@ static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
}

+static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(union ocfs2_dlm_lksb *lksb)
+{
+ return container_of(lksb, struct ocfs2_lock_res, l_lksb);
+}
+
static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
{
BUG_ON(!ocfs2_is_inode_lock(lockres));
@@ -1032,9 +1037,9 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
}


-static void ocfs2_blocking_ast(void *opaque, int level)
+static void ocfs2_blocking_ast(union ocfs2_dlm_lksb *lksb, int level)
{
- struct ocfs2_lock_res *lockres = opaque;
+ struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
int needs_downconvert;
unsigned long flags;
@@ -1063,9 +1068,9 @@ static void ocfs2_blocking_ast(void *opaque, int level)
ocfs2_wake_downconvert_thread(osb);
}

-static void ocfs2_locking_ast(void *opaque)
+static void ocfs2_locking_ast(union ocfs2_dlm_lksb *lksb)
{
- struct ocfs2_lock_res *lockres = opaque;
+ struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
unsigned long flags;
int status;
@@ -1179,8 +1184,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
&lockres->l_lksb,
dlm_flags,
lockres->l_name,
- OCFS2_LOCK_ID_MAX_LEN - 1,
- lockres);
+ OCFS2_LOCK_ID_MAX_LEN - 1);
lockres_clear_pending(lockres, gen, osb);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
@@ -1392,8 +1396,7 @@ again:
&lockres->l_lksb,
lkm_flags,
lockres->l_name,
- OCFS2_LOCK_ID_MAX_LEN - 1,
- lockres);
+ OCFS2_LOCK_ID_MAX_LEN - 1);
lockres_clear_pending(lockres, gen, osb);
if (ret) {
if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
@@ -1827,8 +1830,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
spin_unlock_irqrestore(&lockres->l_lock, flags);

ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
- lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
- lockres);
+ lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
if (ret) {
if (!trylock || (ret != -EAGAIN)) {
ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
@@ -3024,9 +3026,9 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
mlog_exit_void();
}

-static void ocfs2_unlock_ast(void *opaque, int error)
+static void ocfs2_unlock_ast(union ocfs2_dlm_lksb *lksb, int error)
{
- struct ocfs2_lock_res *lockres = opaque;
+ struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
unsigned long flags;

mlog_entry_void();
@@ -3135,8 +3137,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,

mlog(0, "lock %s\n", lockres->l_name);

- ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
- lockres);
+ ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
@@ -3277,8 +3278,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
&lockres->l_lksb,
dlm_flags,
lockres->l_name,
- OCFS2_LOCK_ID_MAX_LEN - 1,
- lockres);
+ OCFS2_LOCK_ID_MAX_LEN - 1);
lockres_clear_pending(lockres, generation, osb);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
@@ -3333,7 +3333,7 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
mlog(0, "lock %s\n", lockres->l_name);

ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
- DLM_LKF_CANCEL, lockres);
+ DLM_LKF_CANCEL);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
ocfs2_recover_from_dlm_error(lockres, 0);
diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c
index e49c410..e26a789 100644
--- a/fs/ocfs2/stack_o2cb.c
+++ b/fs/ocfs2/stack_o2cb.c
@@ -161,20 +161,26 @@ static int dlm_status_to_errno(enum dlm_status status)

static void o2dlm_lock_ast_wrapper(void *astarg)
{
+ union ocfs2_dlm_lksb *lksb = astarg;
+
BUG_ON(o2cb_stack.sp_proto == NULL);

- o2cb_stack.sp_proto->lp_lock_ast(astarg);
+ o2cb_stack.sp_proto->lp_lock_ast(lksb);
}

static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
{
+ union ocfs2_dlm_lksb *lksb = astarg;
+
BUG_ON(o2cb_stack.sp_proto == NULL);

- o2cb_stack.sp_proto->lp_blocking_ast(astarg, level);
+ o2cb_stack.sp_proto->lp_blocking_ast(lksb, level);
}

static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
{
+ union ocfs2_dlm_lksb *lksb = astarg;
+
int error = dlm_status_to_errno(status);

BUG_ON(o2cb_stack.sp_proto == NULL);
@@ -193,7 +199,7 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
if (status == DLM_CANCELGRANT)
return;

- o2cb_stack.sp_proto->lp_unlock_ast(astarg, error);
+ o2cb_stack.sp_proto->lp_unlock_ast(lksb, error);
}

static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
@@ -201,8 +207,7 @@ static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
- unsigned int namelen,
- void *astarg)
+ unsigned int namelen)
{
enum dlm_status status;
int o2dlm_mode = mode_to_o2dlm(mode);
@@ -211,7 +216,7 @@ static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,

status = dlmlock(conn->cc_lockspace, o2dlm_mode, &lksb->lksb_o2dlm,
o2dlm_flags, name, namelen,
- o2dlm_lock_ast_wrapper, astarg,
+ o2dlm_lock_ast_wrapper, lksb,
o2dlm_blocking_ast_wrapper);
ret = dlm_status_to_errno(status);
return ret;
@@ -219,15 +224,14 @@ static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,

static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
- u32 flags,
- void *astarg)
+ u32 flags)
{
enum dlm_status status;
int o2dlm_flags = flags_to_o2dlm(flags);
int ret;

status = dlmunlock(conn->cc_lockspace, &lksb->lksb_o2dlm,
- o2dlm_flags, o2dlm_unlock_ast_wrapper, astarg);
+ o2dlm_flags, o2dlm_unlock_ast_wrapper, lksb);
ret = dlm_status_to_errno(status);
return ret;
}
diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index da78a2a..129b931 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -25,7 +25,6 @@
#include <linux/reboot.h>
#include <asm/uaccess.h>

-#include "ocfs2.h" /* For struct ocfs2_lock_res */
#include "stackglue.h"

#include <linux/dlm_plock.h>
@@ -664,16 +663,10 @@ static void ocfs2_control_exit(void)
-rc);
}

-static struct dlm_lksb *fsdlm_astarg_to_lksb(void *astarg)
-{
- struct ocfs2_lock_res *res = astarg;
- return &res->l_lksb.lksb_fsdlm;
-}
-
static void fsdlm_lock_ast_wrapper(void *astarg)
{
- struct dlm_lksb *lksb = fsdlm_astarg_to_lksb(astarg);
- int status = lksb->sb_status;
+ union ocfs2_dlm_lksb *lksb = astarg;
+ int status = lksb->lksb_fsdlm.sb_status;

BUG_ON(ocfs2_user_plugin.sp_proto == NULL);

@@ -688,16 +681,18 @@ static void fsdlm_lock_ast_wrapper(void *astarg)
*/

if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
- ocfs2_user_plugin.sp_proto->lp_unlock_ast(astarg, 0);
+ ocfs2_user_plugin.sp_proto->lp_unlock_ast(lksb, 0);
else
- ocfs2_user_plugin.sp_proto->lp_lock_ast(astarg);
+ ocfs2_user_plugin.sp_proto->lp_lock_ast(lksb);
}

static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
{
+ union ocfs2_dlm_lksb *lksb = astarg;
+
BUG_ON(ocfs2_user_plugin.sp_proto == NULL);

- ocfs2_user_plugin.sp_proto->lp_blocking_ast(astarg, level);
+ ocfs2_user_plugin.sp_proto->lp_blocking_ast(lksb, level);
}

static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
@@ -705,8 +700,7 @@ static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
- unsigned int namelen,
- void *astarg)
+ unsigned int namelen)
{
int ret;

@@ -716,20 +710,19 @@ static int user_dlm_lock(struct ocfs2_cluster_connection *conn,

ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
flags|DLM_LKF_NODLCKWT, name, namelen, 0,
- fsdlm_lock_ast_wrapper, astarg,
+ fsdlm_lock_ast_wrapper, lksb,
fsdlm_blocking_ast_wrapper);
return ret;
}

static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
- u32 flags,
- void *astarg)
+ u32 flags)
{
int ret;

ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
- flags, &lksb->lksb_fsdlm, astarg);
+ flags, &lksb->lksb_fsdlm, lksb);
return ret;
}

diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index f3df0ba..3500d98 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -233,35 +233,32 @@ EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_locking_protocol);


/*
- * The ocfs2_dlm_lock() and ocfs2_dlm_unlock() functions take
- * "struct ocfs2_lock_res *astarg" instead of "void *astarg" because the
- * underlying stack plugins need to pilfer the lksb off of the lock_res.
- * If some other structure needs to be passed as an astarg, the plugins
- * will need to be given a different avenue to the lksb.
+ * The ocfs2_dlm_lock() and ocfs2_dlm_unlock() functions take no argument
+ * for the ast and bast functions. They will pass the lksb to the ast
+ * and bast. The caller can wrap the lksb with their own structure to
+ * get more information.
*/
int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
- unsigned int namelen,
- struct ocfs2_lock_res *astarg)
+ unsigned int namelen)
{
BUG_ON(lproto == NULL);

return active_stack->sp_ops->dlm_lock(conn, mode, lksb, flags,
- name, namelen, astarg);
+ name, namelen);
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_lock);

int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
- u32 flags,
- struct ocfs2_lock_res *astarg)
+ u32 flags)
{
BUG_ON(lproto == NULL);

- return active_stack->sp_ops->dlm_unlock(conn, lksb, flags, astarg);
+ return active_stack->sp_ops->dlm_unlock(conn, lksb, flags);
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_unlock);

diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index 03a44d6..d699117 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -56,17 +56,6 @@ struct ocfs2_protocol_version {
};

/*
- * The ocfs2_locking_protocol defines the handlers called on ocfs2's behalf.
- */
-struct ocfs2_locking_protocol {
- struct ocfs2_protocol_version lp_max_version;
- void (*lp_lock_ast)(void *astarg);
- void (*lp_blocking_ast)(void *astarg, int level);
- void (*lp_unlock_ast)(void *astarg, int error);
-};
-
-
-/*
* The dlm_lockstatus struct includes lvb space, but the dlm_lksb struct only
* has a pointer to separately allocated lvb space. This struct exists only to
* include in the lksb union to make space for a combined dlm_lksb and lvb.
@@ -88,6 +77,17 @@ union ocfs2_dlm_lksb {
};

/*
+ * The ocfs2_locking_protocol defines the handlers called on ocfs2's behalf.
+ */
+struct ocfs2_locking_protocol {
+ struct ocfs2_protocol_version lp_max_version;
+ void (*lp_lock_ast)(union ocfs2_dlm_lksb *lksb);
+ void (*lp_blocking_ast)(union ocfs2_dlm_lksb *lksb, int level);
+ void (*lp_unlock_ast)(union ocfs2_dlm_lksb *lksb, int error);
+};
+
+
+/*
* A cluster connection. Mostly opaque to ocfs2, the connection holds
* state for the underlying stack. ocfs2 does use cc_version to determine
* locking compatibility.
@@ -155,27 +155,29 @@ struct ocfs2_stack_operations {
*
* ast and bast functions are not part of the call because the
* stack will likely want to wrap ast and bast calls before passing
- * them to stack->sp_proto.
+ * them to stack->sp_proto. There is no astarg. The lksb will
+ * be passed back to the ast and bast functions. The caller can
+ * use this to find their object.
*/
int (*dlm_lock)(struct ocfs2_cluster_connection *conn,
int mode,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
- unsigned int namelen,
- void *astarg);
+ unsigned int namelen);

/*
* Call the underlying dlm unlock function. The ->dlm_unlock()
* function should convert the flags as appropriate.
*
* The unlock ast is not passed, as the stack will want to wrap
- * it before calling stack->sp_proto->lp_unlock_ast().
+ * it before calling stack->sp_proto->lp_unlock_ast(). There is
+ * no astarg. The lksb will be passed back to the unlock ast
+ * function. The caller can use this to find their object.
*/
int (*dlm_unlock)(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
- u32 flags,
- void *astarg);
+ u32 flags);

/*
* Return the status of the current lock status block. The fs
@@ -249,12 +251,10 @@ int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
- unsigned int namelen,
- struct ocfs2_lock_res *astarg);
+ unsigned int namelen);
int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
- u32 flags,
- struct ocfs2_lock_res *astarg);
+ u32 flags);

int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb);
int ocfs2_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb);
--
1.6.6.1

2010-02-10 09:30:08

by Joel Becker

[permalink] [raw]
Subject: [PATCH 10/11] ocfs2_dlmfs: Use the stackglue.

Rather than directly using o2dlm, dlmfs can now use the stackglue. This
allows it to use userspace cluster stacks and fs/dlm. This commit
forces o2cb for now. A latter commit will bump the protocol version and
allow non-o2cb stacks.

This is one big sed, really. LKM_xxMODE becomes DLM_LOCK_xx. LKM_flag
becomes DLM_LKF_flag.

We also learn to check that the LVB is valid before reading it. Any DLM
can lose the contents of the LVB during a complicated recovery. userdlm
should be checking this. Now it does. dlmfs will return 0 from read(2)
if the LVB was invalid.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/dlmfs/dlmfs.c | 57 ++++------
fs/ocfs2/dlmfs/userdlm.c | 266 +++++++++++++++++++++++----------------------
fs/ocfs2/dlmfs/userdlm.h | 16 ++--
3 files changed, 166 insertions(+), 173 deletions(-)

diff --git a/fs/ocfs2/dlmfs/dlmfs.c b/fs/ocfs2/dlmfs/dlmfs.c
index 13ac2bf..8697366 100644
--- a/fs/ocfs2/dlmfs/dlmfs.c
+++ b/fs/ocfs2/dlmfs/dlmfs.c
@@ -47,21 +47,13 @@

#include <asm/uaccess.h>

-
-#include "cluster/nodemanager.h"
-#include "cluster/heartbeat.h"
-#include "cluster/tcp.h"
-
-#include "dlm/dlmapi.h"
-
+#include "stackglue.h"
#include "userdlm.h"
-
#include "dlmfsver.h"

#define MLOG_MASK_PREFIX ML_DLMFS
#include "cluster/masklog.h"

-#include "ocfs2_lockingver.h"

static const struct super_operations dlmfs_ops;
static const struct file_operations dlmfs_file_operations;
@@ -72,15 +64,6 @@ static struct kmem_cache *dlmfs_inode_cache;

struct workqueue_struct *user_dlm_worker;

-/*
- * This is the userdlmfs locking protocol version.
- *
- * See fs/ocfs2/dlmglue.c for more details on locking versions.
- */
-static const struct dlm_protocol_version user_locking_protocol = {
- .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
- .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
-};


/*
@@ -259,7 +242,7 @@ static ssize_t dlmfs_file_read(struct file *filp,
loff_t *ppos)
{
int bytes_left;
- ssize_t readlen;
+ ssize_t readlen, got;
char *lvb_buf;
struct inode *inode = filp->f_path.dentry->d_inode;

@@ -285,9 +268,13 @@ static ssize_t dlmfs_file_read(struct file *filp,
if (!lvb_buf)
return -ENOMEM;

- user_dlm_read_lvb(inode, lvb_buf, readlen);
- bytes_left = __copy_to_user(buf, lvb_buf, readlen);
- readlen -= bytes_left;
+ got = user_dlm_read_lvb(inode, lvb_buf, readlen);
+ if (got) {
+ BUG_ON(got != readlen);
+ bytes_left = __copy_to_user(buf, lvb_buf, readlen);
+ readlen -= bytes_left;
+ } else
+ readlen = 0;

kfree(lvb_buf);

@@ -346,7 +333,7 @@ static void dlmfs_init_once(void *foo)
struct dlmfs_inode_private *ip =
(struct dlmfs_inode_private *) foo;

- ip->ip_dlm = NULL;
+ ip->ip_conn = NULL;
ip->ip_parent = NULL;

inode_init_once(&ip->ip_vfs_inode);
@@ -388,14 +375,14 @@ static void dlmfs_clear_inode(struct inode *inode)
goto clear_fields;
}

- mlog(0, "we're a directory, ip->ip_dlm = 0x%p\n", ip->ip_dlm);
+ mlog(0, "we're a directory, ip->ip_conn = 0x%p\n", ip->ip_conn);
/* we must be a directory. If required, lets unregister the
* dlm context now. */
- if (ip->ip_dlm)
- user_dlm_unregister_context(ip->ip_dlm);
+ if (ip->ip_conn)
+ user_dlm_unregister(ip->ip_conn);
clear_fields:
ip->ip_parent = NULL;
- ip->ip_dlm = NULL;
+ ip->ip_conn = NULL;
}

static struct backing_dev_info dlmfs_backing_dev_info = {
@@ -445,7 +432,7 @@ static struct inode *dlmfs_get_inode(struct inode *parent,
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;

ip = DLMFS_I(inode);
- ip->ip_dlm = DLMFS_I(parent)->ip_dlm;
+ ip->ip_conn = DLMFS_I(parent)->ip_conn;

switch (mode & S_IFMT) {
default:
@@ -499,13 +486,12 @@ static int dlmfs_mkdir(struct inode * dir,
struct inode *inode = NULL;
struct qstr *domain = &dentry->d_name;
struct dlmfs_inode_private *ip;
- struct dlm_ctxt *dlm;
- struct dlm_protocol_version proto = user_locking_protocol;
+ struct ocfs2_cluster_connection *conn;

mlog(0, "mkdir %.*s\n", domain->len, domain->name);

/* verify that we have a proper domain */
- if (domain->len >= O2NM_MAX_NAME_LEN) {
+ if (domain->len >= GROUP_NAME_MAX) {
status = -EINVAL;
mlog(ML_ERROR, "invalid domain name for directory.\n");
goto bail;
@@ -520,14 +506,14 @@ static int dlmfs_mkdir(struct inode * dir,

ip = DLMFS_I(inode);

- dlm = user_dlm_register_context(domain, &proto);
- if (IS_ERR(dlm)) {
- status = PTR_ERR(dlm);
+ conn = user_dlm_register(domain);
+ if (IS_ERR(conn)) {
+ status = PTR_ERR(conn);
mlog(ML_ERROR, "Error %d could not register domain \"%.*s\"\n",
status, domain->len, domain->name);
goto bail;
}
- ip->ip_dlm = dlm;
+ ip->ip_conn = conn;

inc_nlink(dir);
d_instantiate(dentry, inode);
@@ -696,6 +682,7 @@ static int __init init_dlmfs_fs(void)
}
cleanup_worker = 1;

+ user_dlm_set_locking_protocol();
status = register_filesystem(&dlmfs_fs_type);
bail:
if (status) {
diff --git a/fs/ocfs2/dlmfs/userdlm.c b/fs/ocfs2/dlmfs/userdlm.c
index 6adae70..c1b6a56 100644
--- a/fs/ocfs2/dlmfs/userdlm.c
+++ b/fs/ocfs2/dlmfs/userdlm.c
@@ -34,18 +34,19 @@
#include <linux/types.h>
#include <linux/crc32.h>

-
-#include "cluster/nodemanager.h"
-#include "cluster/heartbeat.h"
-#include "cluster/tcp.h"
-
-#include "dlm/dlmapi.h"
-
+#include "ocfs2_lockingver.h"
+#include "stackglue.h"
#include "userdlm.h"

#define MLOG_MASK_PREFIX ML_DLMFS
#include "cluster/masklog.h"

+
+static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
+{
+ return container_of(lksb, struct user_lock_res, l_lksb);
+}
+
static inline int user_check_wait_flag(struct user_lock_res *lockres,
int flag)
{
@@ -73,15 +74,15 @@ static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres)
}

/* I heart container_of... */
-static inline struct dlm_ctxt *
-dlm_ctxt_from_user_lockres(struct user_lock_res *lockres)
+static inline struct ocfs2_cluster_connection *
+cluster_connection_from_user_lockres(struct user_lock_res *lockres)
{
struct dlmfs_inode_private *ip;

ip = container_of(lockres,
struct dlmfs_inode_private,
ip_lockres);
- return ip->ip_dlm;
+ return ip->ip_conn;
}

static struct inode *
@@ -103,9 +104,9 @@ static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
}

#define user_log_dlm_error(_func, _stat, _lockres) do { \
- mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \
- "resource %.*s: %s\n", dlm_errname(_stat), _func, \
- _lockres->l_namelen, _lockres->l_name, dlm_errmsg(_stat)); \
+ mlog(ML_ERROR, "Dlm error %d while calling %s on " \
+ "resource %.*s\n", _stat, _func, \
+ _lockres->l_namelen, _lockres->l_name); \
} while (0)

/* WARNING: This function lives in a world where the only three lock
@@ -113,34 +114,34 @@ static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
* lock types are added. */
static inline int user_highest_compat_lock_level(int level)
{
- int new_level = LKM_EXMODE;
+ int new_level = DLM_LOCK_EX;

- if (level == LKM_EXMODE)
- new_level = LKM_NLMODE;
- else if (level == LKM_PRMODE)
- new_level = LKM_PRMODE;
+ if (level == DLM_LOCK_EX)
+ new_level = DLM_LOCK_NL;
+ else if (level == DLM_LOCK_PR)
+ new_level = DLM_LOCK_PR;
return new_level;
}

-static void user_ast(void *opaque)
+static void user_ast(struct ocfs2_dlm_lksb *lksb)
{
- struct user_lock_res *lockres = opaque;
- struct dlm_lockstatus *lksb;
+ struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
+ int status;

mlog(0, "AST fired for lockres %.*s\n", lockres->l_namelen,
lockres->l_name);

spin_lock(&lockres->l_lock);

- lksb = &(lockres->l_lksb);
- if (lksb->status != DLM_NORMAL) {
+ status = ocfs2_dlm_lock_status(&lockres->l_lksb);
+ if (status) {
mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
- lksb->status, lockres->l_namelen, lockres->l_name);
+ status, lockres->l_namelen, lockres->l_name);
spin_unlock(&lockres->l_lock);
return;
}

- mlog_bug_on_msg(lockres->l_requested == LKM_IVMODE,
+ mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV,
"Lockres %.*s, requested ivmode. flags 0x%x\n",
lockres->l_namelen, lockres->l_name, lockres->l_flags);

@@ -148,13 +149,13 @@ static void user_ast(void *opaque)
if (lockres->l_requested < lockres->l_level) {
if (lockres->l_requested <=
user_highest_compat_lock_level(lockres->l_blocking)) {
- lockres->l_blocking = LKM_NLMODE;
+ lockres->l_blocking = DLM_LOCK_NL;
lockres->l_flags &= ~USER_LOCK_BLOCKED;
}
}

lockres->l_level = lockres->l_requested;
- lockres->l_requested = LKM_IVMODE;
+ lockres->l_requested = DLM_LOCK_IV;
lockres->l_flags |= USER_LOCK_ATTACHED;
lockres->l_flags &= ~USER_LOCK_BUSY;

@@ -193,11 +194,11 @@ static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
return;

switch (lockres->l_blocking) {
- case LKM_EXMODE:
+ case DLM_LOCK_EX:
if (!lockres->l_ex_holders && !lockres->l_ro_holders)
queue = 1;
break;
- case LKM_PRMODE:
+ case DLM_LOCK_PR:
if (!lockres->l_ex_holders)
queue = 1;
break;
@@ -209,9 +210,9 @@ static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
__user_dlm_queue_lockres(lockres);
}

-static void user_bast(void *opaque, int level)
+static void user_bast(struct ocfs2_dlm_lksb *lksb, int level)
{
- struct user_lock_res *lockres = opaque;
+ struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);

mlog(0, "Blocking AST fired for lockres %.*s. Blocking level %d\n",
lockres->l_namelen, lockres->l_name, level);
@@ -227,15 +228,15 @@ static void user_bast(void *opaque, int level)
wake_up(&lockres->l_event);
}

-static void user_unlock_ast(void *opaque, enum dlm_status status)
+static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status)
{
- struct user_lock_res *lockres = opaque;
+ struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);

mlog(0, "UNLOCK AST called on lock %.*s\n", lockres->l_namelen,
lockres->l_name);

- if (status != DLM_NORMAL && status != DLM_CANCELGRANT)
- mlog(ML_ERROR, "Dlm returns status %d\n", status);
+ if (status)
+ mlog(ML_ERROR, "dlm returns status %d\n", status);

spin_lock(&lockres->l_lock);
/* The teardown flag gets set early during the unlock process,
@@ -243,7 +244,7 @@ static void user_unlock_ast(void *opaque, enum dlm_status status)
* for a concurrent cancel. */
if (lockres->l_flags & USER_LOCK_IN_TEARDOWN
&& !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
- lockres->l_level = LKM_IVMODE;
+ lockres->l_level = DLM_LOCK_IV;
} else if (status == DLM_CANCELGRANT) {
/* We tried to cancel a convert request, but it was
* already granted. Don't clear the busy flag - the
@@ -254,7 +255,7 @@ static void user_unlock_ast(void *opaque, enum dlm_status status)
} else {
BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
/* Cancel succeeded, we want to re-queue */
- lockres->l_requested = LKM_IVMODE; /* cancel an
+ lockres->l_requested = DLM_LOCK_IV; /* cancel an
* upconvert
* request. */
lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
@@ -271,6 +272,21 @@ out_noclear:
wake_up(&lockres->l_event);
}

+/*
+ * This is the userdlmfs locking protocol version.
+ *
+ * See fs/ocfs2/dlmglue.c for more details on locking versions.
+ */
+static struct ocfs2_locking_protocol user_dlm_lproto = {
+ .lp_max_version = {
+ .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
+ .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
+ },
+ .lp_lock_ast = user_ast,
+ .lp_blocking_ast = user_bast,
+ .lp_unlock_ast = user_unlock_ast,
+};
+
static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
{
struct inode *inode;
@@ -283,7 +299,8 @@ static void user_dlm_unblock_lock(struct work_struct *work)
int new_level, status;
struct user_lock_res *lockres =
container_of(work, struct user_lock_res, l_work);
- struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
+ struct ocfs2_cluster_connection *conn =
+ cluster_connection_from_user_lockres(lockres);

mlog(0, "processing lockres %.*s\n", lockres->l_namelen,
lockres->l_name);
@@ -322,20 +339,17 @@ static void user_dlm_unblock_lock(struct work_struct *work)
lockres->l_flags |= USER_LOCK_IN_CANCEL;
spin_unlock(&lockres->l_lock);

- status = dlmunlock(dlm,
- &lockres->l_lksb,
- LKM_CANCEL,
- user_unlock_ast,
- lockres);
- if (status != DLM_NORMAL)
- user_log_dlm_error("dlmunlock", status, lockres);
+ status = ocfs2_dlm_unlock(conn, &lockres->l_lksb,
+ DLM_LKF_CANCEL);
+ if (status)
+ user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
goto drop_ref;
}

/* If there are still incompat holders, we can exit safely
* without worrying about re-queueing this lock as that will
* happen on the last call to user_cluster_unlock. */
- if ((lockres->l_blocking == LKM_EXMODE)
+ if ((lockres->l_blocking == DLM_LOCK_EX)
&& (lockres->l_ex_holders || lockres->l_ro_holders)) {
spin_unlock(&lockres->l_lock);
mlog(0, "can't downconvert for ex: ro = %u, ex = %u\n",
@@ -343,7 +357,7 @@ static void user_dlm_unblock_lock(struct work_struct *work)
goto drop_ref;
}

- if ((lockres->l_blocking == LKM_PRMODE)
+ if ((lockres->l_blocking == DLM_LOCK_PR)
&& lockres->l_ex_holders) {
spin_unlock(&lockres->l_lock);
mlog(0, "can't downconvert for pr: ex = %u\n",
@@ -360,17 +374,12 @@ static void user_dlm_unblock_lock(struct work_struct *work)
spin_unlock(&lockres->l_lock);

/* need lock downconvert request now... */
- status = dlmlock(dlm,
- new_level,
- &lockres->l_lksb,
- LKM_CONVERT|LKM_VALBLK,
- lockres->l_name,
- lockres->l_namelen,
- user_ast,
- lockres,
- user_bast);
- if (status != DLM_NORMAL) {
- user_log_dlm_error("dlmlock", status, lockres);
+ status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb,
+ DLM_LKF_CONVERT|DLM_LKF_VALBLK,
+ lockres->l_name,
+ lockres->l_namelen);
+ if (status) {
+ user_log_dlm_error("ocfs2_dlm_lock", status, lockres);
user_recover_from_dlm_error(lockres);
}

@@ -382,10 +391,10 @@ static inline void user_dlm_inc_holders(struct user_lock_res *lockres,
int level)
{
switch(level) {
- case LKM_EXMODE:
+ case DLM_LOCK_EX:
lockres->l_ex_holders++;
break;
- case LKM_PRMODE:
+ case DLM_LOCK_PR:
lockres->l_ro_holders++;
break;
default:
@@ -410,10 +419,11 @@ int user_dlm_cluster_lock(struct user_lock_res *lockres,
int lkm_flags)
{
int status, local_flags;
- struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
+ struct ocfs2_cluster_connection *conn =
+ cluster_connection_from_user_lockres(lockres);

- if (level != LKM_EXMODE &&
- level != LKM_PRMODE) {
+ if (level != DLM_LOCK_EX &&
+ level != DLM_LOCK_PR) {
mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
lockres->l_namelen, lockres->l_name);
status = -EINVAL;
@@ -422,7 +432,7 @@ int user_dlm_cluster_lock(struct user_lock_res *lockres,

mlog(0, "lockres %.*s: asking for %s lock, passed flags = 0x%x\n",
lockres->l_namelen, lockres->l_name,
- (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE",
+ (level == DLM_LOCK_EX) ? "DLM_LOCK_EX" : "DLM_LOCK_PR",
lkm_flags);

again:
@@ -457,35 +467,26 @@ again:
}

if (level > lockres->l_level) {
- local_flags = lkm_flags | LKM_VALBLK;
- if (lockres->l_level != LKM_IVMODE)
- local_flags |= LKM_CONVERT;
+ local_flags = lkm_flags | DLM_LKF_VALBLK;
+ if (lockres->l_level != DLM_LOCK_IV)
+ local_flags |= DLM_LKF_CONVERT;

lockres->l_requested = level;
lockres->l_flags |= USER_LOCK_BUSY;
spin_unlock(&lockres->l_lock);

- BUG_ON(level == LKM_IVMODE);
- BUG_ON(level == LKM_NLMODE);
+ BUG_ON(level == DLM_LOCK_IV);
+ BUG_ON(level == DLM_LOCK_NL);

/* call dlm_lock to upgrade lock now */
- status = dlmlock(dlm,
- level,
- &lockres->l_lksb,
- local_flags,
- lockres->l_name,
- lockres->l_namelen,
- user_ast,
- lockres,
- user_bast);
- if (status != DLM_NORMAL) {
- if ((lkm_flags & LKM_NOQUEUE) &&
- (status == DLM_NOTQUEUED))
- status = -EAGAIN;
- else {
- user_log_dlm_error("dlmlock", status, lockres);
- status = -EINVAL;
- }
+ status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb,
+ local_flags, lockres->l_name,
+ lockres->l_namelen);
+ if (status) {
+ if ((lkm_flags & DLM_LKF_NOQUEUE) &&
+ (status != -EAGAIN))
+ user_log_dlm_error("ocfs2_dlm_lock",
+ status, lockres);
user_recover_from_dlm_error(lockres);
goto bail;
}
@@ -506,11 +507,11 @@ static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
int level)
{
switch(level) {
- case LKM_EXMODE:
+ case DLM_LOCK_EX:
BUG_ON(!lockres->l_ex_holders);
lockres->l_ex_holders--;
break;
- case LKM_PRMODE:
+ case DLM_LOCK_PR:
BUG_ON(!lockres->l_ro_holders);
lockres->l_ro_holders--;
break;
@@ -522,8 +523,8 @@ static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
void user_dlm_cluster_unlock(struct user_lock_res *lockres,
int level)
{
- if (level != LKM_EXMODE &&
- level != LKM_PRMODE) {
+ if (level != DLM_LOCK_EX &&
+ level != DLM_LOCK_PR) {
mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
lockres->l_namelen, lockres->l_name);
return;
@@ -540,33 +541,40 @@ void user_dlm_write_lvb(struct inode *inode,
unsigned int len)
{
struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
- char *lvb = lockres->l_lksb.lvb;
+ char *lvb;

BUG_ON(len > DLM_LVB_LEN);

spin_lock(&lockres->l_lock);

- BUG_ON(lockres->l_level < LKM_EXMODE);
+ BUG_ON(lockres->l_level < DLM_LOCK_EX);
+ lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
memcpy(lvb, val, len);

spin_unlock(&lockres->l_lock);
}

-void user_dlm_read_lvb(struct inode *inode,
- char *val,
- unsigned int len)
+ssize_t user_dlm_read_lvb(struct inode *inode,
+ char *val,
+ unsigned int len)
{
struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
- char *lvb = lockres->l_lksb.lvb;
+ char *lvb;
+ ssize_t ret = len;

BUG_ON(len > DLM_LVB_LEN);

spin_lock(&lockres->l_lock);

- BUG_ON(lockres->l_level < LKM_PRMODE);
- memcpy(val, lvb, len);
+ BUG_ON(lockres->l_level < DLM_LOCK_PR);
+ if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) {
+ lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
+ memcpy(val, lvb, len);
+ } else
+ ret = 0;

spin_unlock(&lockres->l_lock);
+ return ret;
}

void user_dlm_lock_res_init(struct user_lock_res *lockres,
@@ -576,9 +584,9 @@ void user_dlm_lock_res_init(struct user_lock_res *lockres,

spin_lock_init(&lockres->l_lock);
init_waitqueue_head(&lockres->l_event);
- lockres->l_level = LKM_IVMODE;
- lockres->l_requested = LKM_IVMODE;
- lockres->l_blocking = LKM_IVMODE;
+ lockres->l_level = DLM_LOCK_IV;
+ lockres->l_requested = DLM_LOCK_IV;
+ lockres->l_blocking = DLM_LOCK_IV;

/* should have been checked before getting here. */
BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN);
@@ -592,7 +600,8 @@ void user_dlm_lock_res_init(struct user_lock_res *lockres,
int user_dlm_destroy_lock(struct user_lock_res *lockres)
{
int status = -EBUSY;
- struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
+ struct ocfs2_cluster_connection *conn =
+ cluster_connection_from_user_lockres(lockres);

mlog(0, "asked to destroy %.*s\n", lockres->l_namelen, lockres->l_name);

@@ -627,14 +636,9 @@ int user_dlm_destroy_lock(struct user_lock_res *lockres)
lockres->l_flags |= USER_LOCK_BUSY;
spin_unlock(&lockres->l_lock);

- status = dlmunlock(dlm,
- &lockres->l_lksb,
- LKM_VALBLK,
- user_unlock_ast,
- lockres);
- if (status != DLM_NORMAL) {
- user_log_dlm_error("dlmunlock", status, lockres);
- status = -EINVAL;
+ status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK);
+ if (status) {
+ user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
goto bail;
}

@@ -645,32 +649,34 @@ bail:
return status;
}

-struct dlm_ctxt *user_dlm_register_context(struct qstr *name,
- struct dlm_protocol_version *proto)
+static void user_dlm_recovery_handler_noop(int node_num,
+ void *recovery_data)
{
- struct dlm_ctxt *dlm;
- u32 dlm_key;
- char *domain;
-
- domain = kmalloc(name->len + 1, GFP_NOFS);
- if (!domain) {
- mlog_errno(-ENOMEM);
- return ERR_PTR(-ENOMEM);
- }
+ /* We ignore recovery events */
+ return;
+}

- dlm_key = crc32_le(0, name->name, name->len);
+void user_dlm_set_locking_protocol(void)
+{
+ ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version);
+}

- snprintf(domain, name->len + 1, "%.*s", name->len, name->name);
+struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name)
+{
+ int rc;
+ struct ocfs2_cluster_connection *conn;

- dlm = dlm_register_domain(domain, dlm_key, proto);
- if (IS_ERR(dlm))
- mlog_errno(PTR_ERR(dlm));
+ rc = ocfs2_cluster_connect("o2cb", name->name, name->len,
+ &user_dlm_lproto,
+ user_dlm_recovery_handler_noop,
+ NULL, &conn);
+ if (rc)
+ mlog_errno(rc);

- kfree(domain);
- return dlm;
+ return rc ? ERR_PTR(rc) : conn;
}

-void user_dlm_unregister_context(struct dlm_ctxt *dlm)
+void user_dlm_unregister(struct ocfs2_cluster_connection *conn)
{
- dlm_unregister_domain(dlm);
+ ocfs2_cluster_disconnect(conn, 0);
}
diff --git a/fs/ocfs2/dlmfs/userdlm.h b/fs/ocfs2/dlmfs/userdlm.h
index 0c3cc03..3b42d79 100644
--- a/fs/ocfs2/dlmfs/userdlm.h
+++ b/fs/ocfs2/dlmfs/userdlm.h
@@ -57,7 +57,7 @@ struct user_lock_res {
int l_level;
unsigned int l_ro_holders;
unsigned int l_ex_holders;
- struct dlm_lockstatus l_lksb;
+ struct ocfs2_dlm_lksb l_lksb;

int l_requested;
int l_blocking;
@@ -80,15 +80,15 @@ void user_dlm_cluster_unlock(struct user_lock_res *lockres,
void user_dlm_write_lvb(struct inode *inode,
const char *val,
unsigned int len);
-void user_dlm_read_lvb(struct inode *inode,
- char *val,
- unsigned int len);
-struct dlm_ctxt *user_dlm_register_context(struct qstr *name,
- struct dlm_protocol_version *proto);
-void user_dlm_unregister_context(struct dlm_ctxt *dlm);
+ssize_t user_dlm_read_lvb(struct inode *inode,
+ char *val,
+ unsigned int len);
+struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name);
+void user_dlm_unregister(struct ocfs2_cluster_connection *conn);
+void user_dlm_set_locking_protocol(void);

struct dlmfs_inode_private {
- struct dlm_ctxt *ip_dlm;
+ struct ocfs2_cluster_connection *ip_conn;

struct user_lock_res ip_lockres; /* unused for directories. */
struct inode *ip_parent;
--
1.6.6.1

2010-02-10 09:31:27

by Joel Becker

[permalink] [raw]
Subject: [PATCH 05/11] ocfs2: Attach the connection to the lksb

We're going to want it in the ast functions, so we convert union
ocfs2_dlm_lksb to struct ocfs2_dlm_lksb and let it carry the connection.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/dlmglue.c | 8 ++++----
fs/ocfs2/ocfs2.h | 2 +-
fs/ocfs2/stack_o2cb.c | 18 +++++++++---------
fs/ocfs2/stack_user.c | 16 ++++++++--------
fs/ocfs2/stackglue.c | 17 +++++++++++------
fs/ocfs2/stackglue.h | 42 +++++++++++++++++++++++-------------------
6 files changed, 56 insertions(+), 47 deletions(-)

diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 28df5f7..4cb3ac2 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -297,7 +297,7 @@ static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
}

-static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(union ocfs2_dlm_lksb *lksb)
+static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
{
return container_of(lksb, struct ocfs2_lock_res, l_lksb);
}
@@ -1037,7 +1037,7 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
}


-static void ocfs2_blocking_ast(union ocfs2_dlm_lksb *lksb, int level)
+static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
{
struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
@@ -1068,7 +1068,7 @@ static void ocfs2_blocking_ast(union ocfs2_dlm_lksb *lksb, int level)
ocfs2_wake_downconvert_thread(osb);
}

-static void ocfs2_locking_ast(union ocfs2_dlm_lksb *lksb)
+static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
{
struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
@@ -3026,7 +3026,7 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
mlog_exit_void();
}

-static void ocfs2_unlock_ast(union ocfs2_dlm_lksb *lksb, int error)
+static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
{
struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
unsigned long flags;
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 9362eea..5413f6c 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -155,7 +155,7 @@ struct ocfs2_lock_res {
int l_level;
unsigned int l_ro_holders;
unsigned int l_ex_holders;
- union ocfs2_dlm_lksb l_lksb;
+ struct ocfs2_dlm_lksb l_lksb;

/* used from AST/BAST funcs. */
enum ocfs2_ast_action l_action;
diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c
index e26a789..41e6bad 100644
--- a/fs/ocfs2/stack_o2cb.c
+++ b/fs/ocfs2/stack_o2cb.c
@@ -161,7 +161,7 @@ static int dlm_status_to_errno(enum dlm_status status)

static void o2dlm_lock_ast_wrapper(void *astarg)
{
- union ocfs2_dlm_lksb *lksb = astarg;
+ struct ocfs2_dlm_lksb *lksb = astarg;

BUG_ON(o2cb_stack.sp_proto == NULL);

@@ -170,7 +170,7 @@ static void o2dlm_lock_ast_wrapper(void *astarg)

static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
{
- union ocfs2_dlm_lksb *lksb = astarg;
+ struct ocfs2_dlm_lksb *lksb = astarg;

BUG_ON(o2cb_stack.sp_proto == NULL);

@@ -179,7 +179,7 @@ static void o2dlm_blocking_ast_wrapper(void *astarg, int level)

static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
{
- union ocfs2_dlm_lksb *lksb = astarg;
+ struct ocfs2_dlm_lksb *lksb = astarg;

int error = dlm_status_to_errno(status);

@@ -204,7 +204,7 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)

static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
- union ocfs2_dlm_lksb *lksb,
+ struct ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen)
@@ -223,7 +223,7 @@ static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
}

static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn,
- union ocfs2_dlm_lksb *lksb,
+ struct ocfs2_dlm_lksb *lksb,
u32 flags)
{
enum dlm_status status;
@@ -236,7 +236,7 @@ static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn,
return ret;
}

-static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
+static int o2cb_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
{
return dlm_status_to_errno(lksb->lksb_o2dlm.status);
}
@@ -246,17 +246,17 @@ static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
* contents, it will zero out the LVB. Thus the caller can always trust
* the contents.
*/
-static int o2cb_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb)
+static int o2cb_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
{
return 1;
}

-static void *o2cb_dlm_lvb(union ocfs2_dlm_lksb *lksb)
+static void *o2cb_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
{
return (void *)(lksb->lksb_o2dlm.lvb);
}

-static void o2cb_dump_lksb(union ocfs2_dlm_lksb *lksb)
+static void o2cb_dump_lksb(struct ocfs2_dlm_lksb *lksb)
{
dlm_print_one_lock(lksb->lksb_o2dlm.lockid);
}
diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index 129b931..31276ba 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -665,7 +665,7 @@ static void ocfs2_control_exit(void)

static void fsdlm_lock_ast_wrapper(void *astarg)
{
- union ocfs2_dlm_lksb *lksb = astarg;
+ struct ocfs2_dlm_lksb *lksb = astarg;
int status = lksb->lksb_fsdlm.sb_status;

BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
@@ -688,7 +688,7 @@ static void fsdlm_lock_ast_wrapper(void *astarg)

static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
{
- union ocfs2_dlm_lksb *lksb = astarg;
+ struct ocfs2_dlm_lksb *lksb = astarg;

BUG_ON(ocfs2_user_plugin.sp_proto == NULL);

@@ -697,7 +697,7 @@ static void fsdlm_blocking_ast_wrapper(void *astarg, int level)

static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
- union ocfs2_dlm_lksb *lksb,
+ struct ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen)
@@ -716,7 +716,7 @@ static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
}

static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
- union ocfs2_dlm_lksb *lksb,
+ struct ocfs2_dlm_lksb *lksb,
u32 flags)
{
int ret;
@@ -726,19 +726,19 @@ static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
return ret;
}

-static int user_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
+static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
{
return lksb->lksb_fsdlm.sb_status;
}

-static int user_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb)
+static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
{
int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;

return !invalid;
}

-static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb)
+static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
{
if (!lksb->lksb_fsdlm.sb_lvbptr)
lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
@@ -746,7 +746,7 @@ static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb)
return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
}

-static void user_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
+static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
{
}

diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 3500d98..8ef9a57 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -240,47 +240,52 @@ EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_locking_protocol);
*/
int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
- union ocfs2_dlm_lksb *lksb,
+ struct ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen)
{
BUG_ON(lproto == NULL);

+ if (!lksb->lksb_conn)
+ lksb->lksb_conn = conn;
+ else
+ BUG_ON(lksb->lksb_conn != conn);
return active_stack->sp_ops->dlm_lock(conn, mode, lksb, flags,
name, namelen);
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_lock);

int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
- union ocfs2_dlm_lksb *lksb,
+ struct ocfs2_dlm_lksb *lksb,
u32 flags)
{
BUG_ON(lproto == NULL);
+ BUG_ON(lksb->lksb_conn == NULL);

return active_stack->sp_ops->dlm_unlock(conn, lksb, flags);
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_unlock);

-int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
+int ocfs2_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
{
return active_stack->sp_ops->lock_status(lksb);
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_lock_status);

-int ocfs2_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb)
+int ocfs2_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
{
return active_stack->sp_ops->lvb_valid(lksb);
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_lvb_valid);

-void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb)
+void *ocfs2_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
{
return active_stack->sp_ops->lock_lvb(lksb);
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_lvb);

-void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
+void ocfs2_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
{
active_stack->sp_ops->dump_lksb(lksb);
}
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index d699117..bb32926 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -70,10 +70,14 @@ struct fsdlm_lksb_plus_lvb {
* size of the union is known. Lock status structures are embedded in
* ocfs2 inodes.
*/
-union ocfs2_dlm_lksb {
- struct dlm_lockstatus lksb_o2dlm;
- struct dlm_lksb lksb_fsdlm;
- struct fsdlm_lksb_plus_lvb padding;
+struct ocfs2_cluster_connection;
+struct ocfs2_dlm_lksb {
+ union {
+ struct dlm_lockstatus lksb_o2dlm;
+ struct dlm_lksb lksb_fsdlm;
+ struct fsdlm_lksb_plus_lvb padding;
+ };
+ struct ocfs2_cluster_connection *lksb_conn;
};

/*
@@ -81,9 +85,9 @@ union ocfs2_dlm_lksb {
*/
struct ocfs2_locking_protocol {
struct ocfs2_protocol_version lp_max_version;
- void (*lp_lock_ast)(union ocfs2_dlm_lksb *lksb);
- void (*lp_blocking_ast)(union ocfs2_dlm_lksb *lksb, int level);
- void (*lp_unlock_ast)(union ocfs2_dlm_lksb *lksb, int error);
+ void (*lp_lock_ast)(struct ocfs2_dlm_lksb *lksb);
+ void (*lp_blocking_ast)(struct ocfs2_dlm_lksb *lksb, int level);
+ void (*lp_unlock_ast)(struct ocfs2_dlm_lksb *lksb, int error);
};


@@ -161,7 +165,7 @@ struct ocfs2_stack_operations {
*/
int (*dlm_lock)(struct ocfs2_cluster_connection *conn,
int mode,
- union ocfs2_dlm_lksb *lksb,
+ struct ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen);
@@ -176,7 +180,7 @@ struct ocfs2_stack_operations {
* function. The caller can use this to find their object.
*/
int (*dlm_unlock)(struct ocfs2_cluster_connection *conn,
- union ocfs2_dlm_lksb *lksb,
+ struct ocfs2_dlm_lksb *lksb,
u32 flags);

/*
@@ -185,17 +189,17 @@ struct ocfs2_stack_operations {
* callback pulls out the stack-specific lksb, converts the status
* to a proper errno, and returns it.
*/
- int (*lock_status)(union ocfs2_dlm_lksb *lksb);
+ int (*lock_status)(struct ocfs2_dlm_lksb *lksb);

/*
* Return non-zero if the LVB is valid.
*/
- int (*lvb_valid)(union ocfs2_dlm_lksb *lksb);
+ int (*lvb_valid)(struct ocfs2_dlm_lksb *lksb);

/*
* Pull the lvb pointer off of the stack-specific lksb.
*/
- void *(*lock_lvb)(union ocfs2_dlm_lksb *lksb);
+ void *(*lock_lvb)(struct ocfs2_dlm_lksb *lksb);

/*
* Cluster-aware posix locks
@@ -212,7 +216,7 @@ struct ocfs2_stack_operations {
* This is an optoinal debugging hook. If provided, the
* stack can dump debugging information about this lock.
*/
- void (*dump_lksb)(union ocfs2_dlm_lksb *lksb);
+ void (*dump_lksb)(struct ocfs2_dlm_lksb *lksb);
};

/*
@@ -248,18 +252,18 @@ int ocfs2_cluster_this_node(unsigned int *node);
struct ocfs2_lock_res;
int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
- union ocfs2_dlm_lksb *lksb,
+ struct ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen);
int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
- union ocfs2_dlm_lksb *lksb,
+ struct ocfs2_dlm_lksb *lksb,
u32 flags);

-int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb);
-int ocfs2_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb);
-void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb);
-void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb);
+int ocfs2_dlm_lock_status(struct ocfs2_dlm_lksb *lksb);
+int ocfs2_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb);
+void *ocfs2_dlm_lvb(struct ocfs2_dlm_lksb *lksb);
+void ocfs2_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb);

int ocfs2_stack_supports_plocks(void);
int ocfs2_plock(struct ocfs2_cluster_connection *conn, u64 ino,
--
1.6.6.1

2010-02-10 09:30:56

by Joel Becker

[permalink] [raw]
Subject: [PATCH 09/11] ocfs2_dlmfs: Don't honor truncate. The size of a dlmfs file is LVB_LEN

We want folks using dlmfs to be able to use the LVB in places other than
just write(2)/read(2). By ignoring truncate requests, we allow 'echo
"contents" > /dlm/space/lockname' to work.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/dlmfs/dlmfs.c | 18 ++++++++++++++++++
1 files changed, 18 insertions(+), 0 deletions(-)

diff --git a/fs/ocfs2/dlmfs/dlmfs.c b/fs/ocfs2/dlmfs/dlmfs.c
index e21ce0e..13ac2bf 100644
--- a/fs/ocfs2/dlmfs/dlmfs.c
+++ b/fs/ocfs2/dlmfs/dlmfs.c
@@ -220,6 +220,23 @@ static int dlmfs_file_release(struct inode *inode,
return 0;
}

+/*
+ * We do ->setattr() just to override size changes. Our size is the size
+ * of the LVB and nothing else.
+ */
+static int dlmfs_file_setattr(struct dentry *dentry, struct iattr *attr)
+{
+ int error;
+ struct inode *inode = dentry->d_inode;
+
+ attr->ia_valid &= ~ATTR_SIZE;
+ error = inode_change_ok(inode, attr);
+ if (!error)
+ error = inode_setattr(inode, attr);
+
+ return error;
+}
+
static unsigned int dlmfs_file_poll(struct file *file, poll_table *wait)
{
int event = 0;
@@ -634,6 +651,7 @@ static const struct super_operations dlmfs_ops = {

static const struct inode_operations dlmfs_file_inode_operations = {
.getattr = simple_getattr,
+ .setattr = dlmfs_file_setattr,
};

static int dlmfs_get_sb(struct file_system_type *fs_type,
--
1.6.6.1

2010-02-10 09:30:13

by Joel Becker

[permalink] [raw]
Subject: [PATCH 02/11] ocfs2_dlmfs: Use poll() to signify BASTs.

o2dlm's userspace filesystem is an easy way to use the DLM from
userspace. It is intentionally simple. For example, it does not allow
for asynchronous behavior or lock conversion. This is intentional to
keep the interface simple.

Because there is no asynchronous notification, there is no way for a
process holding a lock to know another node needs the lock. This is the
number one complaint of ocfs2_dlmfs users. Turns out, we can solve this
very easily. We add poll() support to ocfs2_dlmfs. When a BAST is
received, the lock's file descriptor will receive POLLIN.

This is trivial to implement. Userdlm already has an appropriate
waitqueue, and the lock knows when it is blocked.

We add the "bast" capability to tell userspace this is available.

Signed-off-by: Joel Becker <[email protected]>
Acked-by: Mark Fasheh <[email protected]>

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/dlm/dlmfs.c | 24 +++++++++++++++++++++++-
1 files changed, 23 insertions(+), 1 deletions(-)

diff --git a/fs/ocfs2/dlm/dlmfs.c b/fs/ocfs2/dlm/dlmfs.c
index 77d0df4..ddf55da 100644
--- a/fs/ocfs2/dlm/dlmfs.c
+++ b/fs/ocfs2/dlm/dlmfs.c
@@ -43,6 +43,7 @@
#include <linux/init.h>
#include <linux/string.h>
#include <linux/backing-dev.h>
+#include <linux/poll.h>

#include <asm/uaccess.h>

@@ -98,8 +99,12 @@ static const struct dlm_protocol_version user_locking_protocol = {
* The ABI features are local to this machine's dlmfs mount. This is
* distinct from the locking protocol, which is concerned with inter-node
* interaction.
+ *
+ * Capabilities:
+ * - bast : POLLIN against the file descriptor of a held lock
+ * signifies a bast fired on the lock.
*/
-#define DLMFS_CAPABILITIES ""
+#define DLMFS_CAPABILITIES "bast"
extern int param_set_dlmfs_capabilities(const char *val,
struct kernel_param *kp)
{
@@ -215,6 +220,22 @@ static int dlmfs_file_release(struct inode *inode,
return 0;
}

+static unsigned int dlmfs_file_poll(struct file *file, poll_table *wait)
+{
+ int event = 0;
+ struct inode *inode = file->f_path.dentry->d_inode;
+ struct dlmfs_inode_private *ip = DLMFS_I(inode);
+
+ poll_wait(file, &ip->ip_lockres.l_event, wait);
+
+ spin_lock(&ip->ip_lockres.l_lock);
+ if (ip->ip_lockres.l_flags & USER_LOCK_BLOCKED)
+ event = POLLIN | POLLRDNORM;
+ spin_unlock(&ip->ip_lockres.l_lock);
+
+ return event;
+}
+
static ssize_t dlmfs_file_read(struct file *filp,
char __user *buf,
size_t count,
@@ -585,6 +606,7 @@ static int dlmfs_fill_super(struct super_block * sb,
static const struct file_operations dlmfs_file_operations = {
.open = dlmfs_file_open,
.release = dlmfs_file_release,
+ .poll = dlmfs_file_poll,
.read = dlmfs_file_read,
.write = dlmfs_file_write,
};
--
1.6.6.1

2010-02-10 09:31:23

by Joel Becker

[permalink] [raw]
Subject: [PATCH 11/11] ocfs2_dlmfs: Enable the use of user cluster stacks.

Unlike ocfs2, dlmfs has no permanent storage. It can't store off a
cluster stack it is supposed to be using. So it can't specify the stack
name in ocfs2_cluster_connect().

Instead, we create ocfs2_cluster_connect_agnostic(), which simply uses
the stack that is currently enabled. This is find for dlmfs, which will
rely on the stack initialization.

We add the "stackglue" capability to dlmfs's capability list. This lets
userspace know dlmfs can be used with all cluster stacks.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/dlmfs/dlmfs.c | 2 +-
fs/ocfs2/dlmfs/userdlm.c | 8 ++++----
fs/ocfs2/ocfs2_lockingver.h | 2 ++
fs/ocfs2/stackglue.c | 18 ++++++++++++++++++
fs/ocfs2/stackglue.h | 11 +++++++++++
5 files changed, 36 insertions(+), 5 deletions(-)

diff --git a/fs/ocfs2/dlmfs/dlmfs.c b/fs/ocfs2/dlmfs/dlmfs.c
index 8697366..1b0de15 100644
--- a/fs/ocfs2/dlmfs/dlmfs.c
+++ b/fs/ocfs2/dlmfs/dlmfs.c
@@ -87,7 +87,7 @@ struct workqueue_struct *user_dlm_worker;
* - bast : POLLIN against the file descriptor of a held lock
* signifies a bast fired on the lock.
*/
-#define DLMFS_CAPABILITIES "bast"
+#define DLMFS_CAPABILITIES "bast stackglue"
extern int param_set_dlmfs_capabilities(const char *val,
struct kernel_param *kp)
{
diff --git a/fs/ocfs2/dlmfs/userdlm.c b/fs/ocfs2/dlmfs/userdlm.c
index c1b6a56..2858ee6 100644
--- a/fs/ocfs2/dlmfs/userdlm.c
+++ b/fs/ocfs2/dlmfs/userdlm.c
@@ -666,10 +666,10 @@ struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name)
int rc;
struct ocfs2_cluster_connection *conn;

- rc = ocfs2_cluster_connect("o2cb", name->name, name->len,
- &user_dlm_lproto,
- user_dlm_recovery_handler_noop,
- NULL, &conn);
+ rc = ocfs2_cluster_connect_agnostic(name->name, name->len,
+ &user_dlm_lproto,
+ user_dlm_recovery_handler_noop,
+ NULL, &conn);
if (rc)
mlog_errno(rc);

diff --git a/fs/ocfs2/ocfs2_lockingver.h b/fs/ocfs2/ocfs2_lockingver.h
index 82d5eea..2e45c8d 100644
--- a/fs/ocfs2/ocfs2_lockingver.h
+++ b/fs/ocfs2/ocfs2_lockingver.h
@@ -23,6 +23,8 @@
/*
* The protocol version for ocfs2 cluster locking. See dlmglue.c for
* more details.
+ *
+ * 1.0 - Initial locking version from ocfs2 1.4.
*/
#define OCFS2_LOCKING_PROTOCOL_MAJOR 1
#define OCFS2_LOCKING_PROTOCOL_MINOR 0
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 31db2e8..39abf89 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -373,6 +373,24 @@ out:
}
EXPORT_SYMBOL_GPL(ocfs2_cluster_connect);

+/* The caller will ensure all nodes have the same cluster stack */
+int ocfs2_cluster_connect_agnostic(const char *group,
+ int grouplen,
+ struct ocfs2_locking_protocol *lproto,
+ void (*recovery_handler)(int node_num,
+ void *recovery_data),
+ void *recovery_data,
+ struct ocfs2_cluster_connection **conn)
+{
+ char *stack_name = NULL;
+
+ if (cluster_stack_name[0])
+ stack_name = cluster_stack_name;
+ return ocfs2_cluster_connect(stack_name, group, grouplen, lproto,
+ recovery_handler, recovery_data, conn);
+}
+EXPORT_SYMBOL_GPL(ocfs2_cluster_connect_agnostic);
+
/* If hangup_pending is 0, the stack driver will be dropped */
int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn,
int hangup_pending)
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index b1981ba..8ce7398 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -246,6 +246,17 @@ int ocfs2_cluster_connect(const char *stack_name,
void *recovery_data),
void *recovery_data,
struct ocfs2_cluster_connection **conn);
+/*
+ * Used by callers that don't store their stack name. They must ensure
+ * all nodes have the same stack.
+ */
+int ocfs2_cluster_connect_agnostic(const char *group,
+ int grouplen,
+ struct ocfs2_locking_protocol *lproto,
+ void (*recovery_handler)(int node_num,
+ void *recovery_data),
+ void *recovery_data,
+ struct ocfs2_cluster_connection **conn);
int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn,
int hangup_pending);
void ocfs2_cluster_hangup(const char *group, int grouplen);
--
1.6.6.1

2010-02-10 09:30:53

by Joel Becker

[permalink] [raw]
Subject: [PATCH 01/11] ocfs2_dlmfs: Add capabilities parameter.

Over time, dlmfs has added some features that were not part of the
initial ABI. Unfortunately, some of these features are not detectable
via standard usage. For example, Linux's default poll always returns
POLLIN, so there is no way for a caller of poll(2) to know when dlmfs
added poll support. Instead, we provide this list of new capabilities.

Capabilities is a read-only attribute. We do it as a module parameter
so we can discover it whether dlmfs is built in, loaded, or even not
loaded (via modinfo).

The ABI features are local to this machine's dlmfs mount. This is
distinct from the locking protocol, which is concerned with inter-node
interaction.

Signed-off-by: Joel Becker <[email protected]>
---
fs/ocfs2/dlm/dlmfs.c | 36 ++++++++++++++++++++++++++++++++++++
1 files changed, 36 insertions(+), 0 deletions(-)

diff --git a/fs/ocfs2/dlm/dlmfs.c b/fs/ocfs2/dlm/dlmfs.c
index 02bf178..77d0df4 100644
--- a/fs/ocfs2/dlm/dlmfs.c
+++ b/fs/ocfs2/dlm/dlmfs.c
@@ -81,6 +81,42 @@ static const struct dlm_protocol_version user_locking_protocol = {
.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
};

+
+/*
+ * These are the ABI capabilities of dlmfs.
+ *
+ * Over time, dlmfs has added some features that were not part of the
+ * initial ABI. Unfortunately, some of these features are not detectable
+ * via standard usage. For example, Linux's default poll always returns
+ * POLLIN, so there is no way for a caller of poll(2) to know when dlmfs
+ * added poll support. Instead, we provide this list of new capabilities.
+ *
+ * Capabilities is a read-only attribute. We do it as a module parameter
+ * so we can discover it whether dlmfs is built in, loaded, or even not
+ * loaded.
+ *
+ * The ABI features are local to this machine's dlmfs mount. This is
+ * distinct from the locking protocol, which is concerned with inter-node
+ * interaction.
+ */
+#define DLMFS_CAPABILITIES ""
+extern int param_set_dlmfs_capabilities(const char *val,
+ struct kernel_param *kp)
+{
+ printk(KERN_ERR "%s: readonly parameter\n", kp->name);
+ return -EINVAL;
+}
+static int param_get_dlmfs_capabilities(char *buffer,
+ struct kernel_param *kp)
+{
+ return strlcpy(buffer, DLMFS_CAPABILITIES,
+ strlen(DLMFS_CAPABILITIES) + 1);
+}
+module_param_call(capabilities, param_set_dlmfs_capabilities,
+ param_get_dlmfs_capabilities, NULL, 0444);
+MODULE_PARM_DESC(capabilities, DLMFS_CAPABILITIES);
+
+
/*
* decodes a set of open flags into a valid lock level and a set of flags.
* returns < 0 if we have invalid flags
--
1.6.6.1

2010-02-11 20:41:54

by Sunil Mushran

[permalink] [raw]
Subject: Re: [Ocfs2-devel] [PATCH 01/11] ocfs2_dlmfs: Add capabilities parameter.

Signed-off-by: Sunil Mushran <[email protected]>


Joel Becker wrote:
> Over time, dlmfs has added some features that were not part of the
> initial ABI. Unfortunately, some of these features are not detectable
> via standard usage. For example, Linux's default poll always returns
> POLLIN, so there is no way for a caller of poll(2) to know when dlmfs
> added poll support. Instead, we provide this list of new capabilities.
>
> Capabilities is a read-only attribute. We do it as a module parameter
> so we can discover it whether dlmfs is built in, loaded, or even not
> loaded (via modinfo).
>
> The ABI features are local to this machine's dlmfs mount. This is
> distinct from the locking protocol, which is concerned with inter-node
> interaction.
>
> Signed-off-by: Joel Becker <[email protected]>
> ---
> fs/ocfs2/dlm/dlmfs.c | 36 ++++++++++++++++++++++++++++++++++++
> 1 files changed, 36 insertions(+), 0 deletions(-)
>
> diff --git a/fs/ocfs2/dlm/dlmfs.c b/fs/ocfs2/dlm/dlmfs.c
> index 02bf178..77d0df4 100644
> --- a/fs/ocfs2/dlm/dlmfs.c
> +++ b/fs/ocfs2/dlm/dlmfs.c
> @@ -81,6 +81,42 @@ static const struct dlm_protocol_version user_locking_protocol = {
> .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
> };
>
> +
> +/*
> + * These are the ABI capabilities of dlmfs.
> + *
> + * Over time, dlmfs has added some features that were not part of the
> + * initial ABI. Unfortunately, some of these features are not detectable
> + * via standard usage. For example, Linux's default poll always returns
> + * POLLIN, so there is no way for a caller of poll(2) to know when dlmfs
> + * added poll support. Instead, we provide this list of new capabilities.
> + *
> + * Capabilities is a read-only attribute. We do it as a module parameter
> + * so we can discover it whether dlmfs is built in, loaded, or even not
> + * loaded.
> + *
> + * The ABI features are local to this machine's dlmfs mount. This is
> + * distinct from the locking protocol, which is concerned with inter-node
> + * interaction.
> + */
> +#define DLMFS_CAPABILITIES ""
> +extern int param_set_dlmfs_capabilities(const char *val,
> + struct kernel_param *kp)
> +{
> + printk(KERN_ERR "%s: readonly parameter\n", kp->name);
> + return -EINVAL;
> +}
> +static int param_get_dlmfs_capabilities(char *buffer,
> + struct kernel_param *kp)
> +{
> + return strlcpy(buffer, DLMFS_CAPABILITIES,
> + strlen(DLMFS_CAPABILITIES) + 1);
> +}
> +module_param_call(capabilities, param_set_dlmfs_capabilities,
> + param_get_dlmfs_capabilities, NULL, 0444);
> +MODULE_PARM_DESC(capabilities, DLMFS_CAPABILITIES);
> +
> +
> /*
> * decodes a set of open flags into a valid lock level and a set of flags.
> * returns < 0 if we have invalid flags
>

2010-02-11 21:12:54

by Sunil Mushran

[permalink] [raw]
Subject: Re: [Ocfs2-devel] [PATCH 03/11] ocfs2_dlmfs: Move to its own directory

Acked-by: Sunil Mushran <[email protected]>

Joel Becker wrote:
> We're going to remove the tie between ocfs2_dlmfs and o2dlm.
> ocfs2_dlmfs doesn't belong in the fs/ocfs2/dlm directory anymore. Here
> we move it to fs/ocfs2/dlmfs.
>
> Signed-off-by: Joel Becker <[email protected]>
> ---
> fs/ocfs2/Makefile | 1 +
> fs/ocfs2/dlm/Makefile | 3 +--
> fs/ocfs2/dlmfs/Makefile | 5 +++++
> fs/ocfs2/{dlm => dlmfs}/dlmfs.c | 2 +-
> fs/ocfs2/{dlm => dlmfs}/dlmfsver.c | 0
> fs/ocfs2/{dlm => dlmfs}/dlmfsver.h | 0
> fs/ocfs2/{dlm => dlmfs}/userdlm.c | 2 +-
> fs/ocfs2/{dlm => dlmfs}/userdlm.h | 0
> 8 files changed, 9 insertions(+), 4 deletions(-)
> create mode 100644 fs/ocfs2/dlmfs/Makefile
> rename fs/ocfs2/{dlm => dlmfs}/dlmfs.c (99%)
> rename fs/ocfs2/{dlm => dlmfs}/dlmfsver.c (100%)
> rename fs/ocfs2/{dlm => dlmfs}/dlmfsver.h (100%)
> rename fs/ocfs2/{dlm => dlmfs}/userdlm.c (99%)
> rename fs/ocfs2/{dlm => dlmfs}/userdlm.h (100%)
>
> diff --git a/fs/ocfs2/Makefile b/fs/ocfs2/Makefile
> index 600d2d2..791c088 100644
> --- a/fs/ocfs2/Makefile
> +++ b/fs/ocfs2/Makefile
> @@ -46,6 +46,7 @@ ocfs2_stackglue-objs := stackglue.o
> ocfs2_stack_o2cb-objs := stack_o2cb.o
> ocfs2_stack_user-objs := stack_user.o
>
> +obj-$(CONFIG_OCFS2_FS) += dlmfs/
> # cluster/ is always needed when OCFS2_FS for masklog support
> obj-$(CONFIG_OCFS2_FS) += cluster/
> obj-$(CONFIG_OCFS2_FS_O2CB) += dlm/
> diff --git a/fs/ocfs2/dlm/Makefile b/fs/ocfs2/dlm/Makefile
> index 1903613..dcebf0d 100644
> --- a/fs/ocfs2/dlm/Makefile
> +++ b/fs/ocfs2/dlm/Makefile
> @@ -1,8 +1,7 @@
> EXTRA_CFLAGS += -Ifs/ocfs2
>
> -obj-$(CONFIG_OCFS2_FS_O2CB) += ocfs2_dlm.o ocfs2_dlmfs.o
> +obj-$(CONFIG_OCFS2_FS_O2CB) += ocfs2_dlm.o
>
> ocfs2_dlm-objs := dlmdomain.o dlmdebug.o dlmthread.o dlmrecovery.o \
> dlmmaster.o dlmast.o dlmconvert.o dlmlock.o dlmunlock.o dlmver.o
>
> -ocfs2_dlmfs-objs := userdlm.o dlmfs.o dlmfsver.o
> diff --git a/fs/ocfs2/dlmfs/Makefile b/fs/ocfs2/dlmfs/Makefile
> new file mode 100644
> index 0000000..df69b48
> --- /dev/null
> +++ b/fs/ocfs2/dlmfs/Makefile
> @@ -0,0 +1,5 @@
> +EXTRA_CFLAGS += -Ifs/ocfs2
> +
> +obj-$(CONFIG_OCFS2_FS) += ocfs2_dlmfs.o
> +
> +ocfs2_dlmfs-objs := userdlm.o dlmfs.o dlmfsver.o
> diff --git a/fs/ocfs2/dlm/dlmfs.c b/fs/ocfs2/dlmfs/dlmfs.c
> similarity index 99%
> rename from fs/ocfs2/dlm/dlmfs.c
> rename to fs/ocfs2/dlmfs/dlmfs.c
> index ddf55da..e21ce0e 100644
> --- a/fs/ocfs2/dlm/dlmfs.c
> +++ b/fs/ocfs2/dlmfs/dlmfs.c
> @@ -52,7 +52,7 @@
> #include "cluster/heartbeat.h"
> #include "cluster/tcp.h"
>
> -#include "dlmapi.h"
> +#include "dlm/dlmapi.h"
>
> #include "userdlm.h"
>
> diff --git a/fs/ocfs2/dlm/dlmfsver.c b/fs/ocfs2/dlmfs/dlmfsver.c
> similarity index 100%
> rename from fs/ocfs2/dlm/dlmfsver.c
> rename to fs/ocfs2/dlmfs/dlmfsver.c
> diff --git a/fs/ocfs2/dlm/dlmfsver.h b/fs/ocfs2/dlmfs/dlmfsver.h
> similarity index 100%
> rename from fs/ocfs2/dlm/dlmfsver.h
> rename to fs/ocfs2/dlmfs/dlmfsver.h
> diff --git a/fs/ocfs2/dlm/userdlm.c b/fs/ocfs2/dlmfs/userdlm.c
> similarity index 99%
> rename from fs/ocfs2/dlm/userdlm.c
> rename to fs/ocfs2/dlmfs/userdlm.c
> index 4cb1d3d..6adae70 100644
> --- a/fs/ocfs2/dlm/userdlm.c
> +++ b/fs/ocfs2/dlmfs/userdlm.c
> @@ -39,7 +39,7 @@
> #include "cluster/heartbeat.h"
> #include "cluster/tcp.h"
>
> -#include "dlmapi.h"
> +#include "dlm/dlmapi.h"
>
> #include "userdlm.h"
>
> diff --git a/fs/ocfs2/dlm/userdlm.h b/fs/ocfs2/dlmfs/userdlm.h
> similarity index 100%
> rename from fs/ocfs2/dlm/userdlm.h
> rename to fs/ocfs2/dlmfs/userdlm.h
>

2010-02-11 21:21:10

by Sunil Mushran

[permalink] [raw]
Subject: Re: [Ocfs2-devel] [PATCH 04/11] ocfs2: Pass lksbs back from stackglue ast/bast functions.

Acked-by: Sunil Mushran <[email protected]>

Joel Becker wrote:
> The stackglue ast and bast functions tried to maintain the fiction that
> their arguments were void pointers. In reality, stack_user.c had to
> know that the argument was an ocfs2_lock_res in order to get the status
> off of the lksb. That's ugly.
>
> This changes stackglue to always pass the lksb as the argument to ast
> and bast functions. The caller can always use container_of() to get the
> ocfs2_lock_res or user_dlm_lock_res. The net effect to the caller is
> zero. They still get back the lockres in their ast. stackglue gets
> cleaner, and now can use the lksb itself.
>
> Signed-off-by: Joel Becker <[email protected]>
> ---
> fs/ocfs2/dlmglue.c | 34 +++++++++++++++++-----------------
> fs/ocfs2/stack_o2cb.c | 22 +++++++++++++---------
> fs/ocfs2/stack_user.c | 29 +++++++++++------------------
> fs/ocfs2/stackglue.c | 19 ++++++++-----------
> fs/ocfs2/stackglue.h | 42 +++++++++++++++++++++---------------------
> 5 files changed, 70 insertions(+), 76 deletions(-)
>
> diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
> index c5e4a49..28df5f7 100644
> --- a/fs/ocfs2/dlmglue.c
> +++ b/fs/ocfs2/dlmglue.c
> @@ -297,6 +297,11 @@ static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
> lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
> }
>
> +static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(union ocfs2_dlm_lksb *lksb)
> +{
> + return container_of(lksb, struct ocfs2_lock_res, l_lksb);
> +}
> +
> static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
> {
> BUG_ON(!ocfs2_is_inode_lock(lockres));
> @@ -1032,9 +1037,9 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
> }
>
>
> -static void ocfs2_blocking_ast(void *opaque, int level)
> +static void ocfs2_blocking_ast(union ocfs2_dlm_lksb *lksb, int level)
> {
> - struct ocfs2_lock_res *lockres = opaque;
> + struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
> struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
> int needs_downconvert;
> unsigned long flags;
> @@ -1063,9 +1068,9 @@ static void ocfs2_blocking_ast(void *opaque, int level)
> ocfs2_wake_downconvert_thread(osb);
> }
>
> -static void ocfs2_locking_ast(void *opaque)
> +static void ocfs2_locking_ast(union ocfs2_dlm_lksb *lksb)
> {
> - struct ocfs2_lock_res *lockres = opaque;
> + struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
> struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
> unsigned long flags;
> int status;
> @@ -1179,8 +1184,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
> &lockres->l_lksb,
> dlm_flags,
> lockres->l_name,
> - OCFS2_LOCK_ID_MAX_LEN - 1,
> - lockres);
> + OCFS2_LOCK_ID_MAX_LEN - 1);
> lockres_clear_pending(lockres, gen, osb);
> if (ret) {
> ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
> @@ -1392,8 +1396,7 @@ again:
> &lockres->l_lksb,
> lkm_flags,
> lockres->l_name,
> - OCFS2_LOCK_ID_MAX_LEN - 1,
> - lockres);
> + OCFS2_LOCK_ID_MAX_LEN - 1);
> lockres_clear_pending(lockres, gen, osb);
> if (ret) {
> if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
> @@ -1827,8 +1830,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
> spin_unlock_irqrestore(&lockres->l_lock, flags);
>
> ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
> - lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
> - lockres);
> + lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
> if (ret) {
> if (!trylock || (ret != -EAGAIN)) {
> ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
> @@ -3024,9 +3026,9 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
> mlog_exit_void();
> }
>
> -static void ocfs2_unlock_ast(void *opaque, int error)
> +static void ocfs2_unlock_ast(union ocfs2_dlm_lksb *lksb, int error)
> {
> - struct ocfs2_lock_res *lockres = opaque;
> + struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
> unsigned long flags;
>
> mlog_entry_void();
> @@ -3135,8 +3137,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
>
> mlog(0, "lock %s\n", lockres->l_name);
>
> - ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
> - lockres);
> + ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
> if (ret) {
> ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
> mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
> @@ -3277,8 +3278,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
> &lockres->l_lksb,
> dlm_flags,
> lockres->l_name,
> - OCFS2_LOCK_ID_MAX_LEN - 1,
> - lockres);
> + OCFS2_LOCK_ID_MAX_LEN - 1);
> lockres_clear_pending(lockres, generation, osb);
> if (ret) {
> ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
> @@ -3333,7 +3333,7 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
> mlog(0, "lock %s\n", lockres->l_name);
>
> ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
> - DLM_LKF_CANCEL, lockres);
> + DLM_LKF_CANCEL);
> if (ret) {
> ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
> ocfs2_recover_from_dlm_error(lockres, 0);
> diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c
> index e49c410..e26a789 100644
> --- a/fs/ocfs2/stack_o2cb.c
> +++ b/fs/ocfs2/stack_o2cb.c
> @@ -161,20 +161,26 @@ static int dlm_status_to_errno(enum dlm_status status)
>
> static void o2dlm_lock_ast_wrapper(void *astarg)
> {
> + union ocfs2_dlm_lksb *lksb = astarg;
> +
> BUG_ON(o2cb_stack.sp_proto == NULL);
>
> - o2cb_stack.sp_proto->lp_lock_ast(astarg);
> + o2cb_stack.sp_proto->lp_lock_ast(lksb);
> }
>
> static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
> {
> + union ocfs2_dlm_lksb *lksb = astarg;
> +
> BUG_ON(o2cb_stack.sp_proto == NULL);
>
> - o2cb_stack.sp_proto->lp_blocking_ast(astarg, level);
> + o2cb_stack.sp_proto->lp_blocking_ast(lksb, level);
> }
>
> static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
> {
> + union ocfs2_dlm_lksb *lksb = astarg;
> +
> int error = dlm_status_to_errno(status);
>
> BUG_ON(o2cb_stack.sp_proto == NULL);
> @@ -193,7 +199,7 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
> if (status == DLM_CANCELGRANT)
> return;
>
> - o2cb_stack.sp_proto->lp_unlock_ast(astarg, error);
> + o2cb_stack.sp_proto->lp_unlock_ast(lksb, error);
> }
>
> static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
> @@ -201,8 +207,7 @@ static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
> union ocfs2_dlm_lksb *lksb,
> u32 flags,
> void *name,
> - unsigned int namelen,
> - void *astarg)
> + unsigned int namelen)
> {
> enum dlm_status status;
> int o2dlm_mode = mode_to_o2dlm(mode);
> @@ -211,7 +216,7 @@ static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
>
> status = dlmlock(conn->cc_lockspace, o2dlm_mode, &lksb->lksb_o2dlm,
> o2dlm_flags, name, namelen,
> - o2dlm_lock_ast_wrapper, astarg,
> + o2dlm_lock_ast_wrapper, lksb,
> o2dlm_blocking_ast_wrapper);
> ret = dlm_status_to_errno(status);
> return ret;
> @@ -219,15 +224,14 @@ static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
>
> static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn,
> union ocfs2_dlm_lksb *lksb,
> - u32 flags,
> - void *astarg)
> + u32 flags)
> {
> enum dlm_status status;
> int o2dlm_flags = flags_to_o2dlm(flags);
> int ret;
>
> status = dlmunlock(conn->cc_lockspace, &lksb->lksb_o2dlm,
> - o2dlm_flags, o2dlm_unlock_ast_wrapper, astarg);
> + o2dlm_flags, o2dlm_unlock_ast_wrapper, lksb);
> ret = dlm_status_to_errno(status);
> return ret;
> }
> diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
> index da78a2a..129b931 100644
> --- a/fs/ocfs2/stack_user.c
> +++ b/fs/ocfs2/stack_user.c
> @@ -25,7 +25,6 @@
> #include <linux/reboot.h>
> #include <asm/uaccess.h>
>
> -#include "ocfs2.h" /* For struct ocfs2_lock_res */
> #include "stackglue.h"
>
> #include <linux/dlm_plock.h>
> @@ -664,16 +663,10 @@ static void ocfs2_control_exit(void)
> -rc);
> }
>
> -static struct dlm_lksb *fsdlm_astarg_to_lksb(void *astarg)
> -{
> - struct ocfs2_lock_res *res = astarg;
> - return &res->l_lksb.lksb_fsdlm;
> -}
> -
> static void fsdlm_lock_ast_wrapper(void *astarg)
> {
> - struct dlm_lksb *lksb = fsdlm_astarg_to_lksb(astarg);
> - int status = lksb->sb_status;
> + union ocfs2_dlm_lksb *lksb = astarg;
> + int status = lksb->lksb_fsdlm.sb_status;
>
> BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
>
> @@ -688,16 +681,18 @@ static void fsdlm_lock_ast_wrapper(void *astarg)
> */
>
> if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
> - ocfs2_user_plugin.sp_proto->lp_unlock_ast(astarg, 0);
> + ocfs2_user_plugin.sp_proto->lp_unlock_ast(lksb, 0);
> else
> - ocfs2_user_plugin.sp_proto->lp_lock_ast(astarg);
> + ocfs2_user_plugin.sp_proto->lp_lock_ast(lksb);
> }
>
> static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
> {
> + union ocfs2_dlm_lksb *lksb = astarg;
> +
> BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
>
> - ocfs2_user_plugin.sp_proto->lp_blocking_ast(astarg, level);
> + ocfs2_user_plugin.sp_proto->lp_blocking_ast(lksb, level);
> }
>
> static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
> @@ -705,8 +700,7 @@ static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
> union ocfs2_dlm_lksb *lksb,
> u32 flags,
> void *name,
> - unsigned int namelen,
> - void *astarg)
> + unsigned int namelen)
> {
> int ret;
>
> @@ -716,20 +710,19 @@ static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
>
> ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
> flags|DLM_LKF_NODLCKWT, name, namelen, 0,
> - fsdlm_lock_ast_wrapper, astarg,
> + fsdlm_lock_ast_wrapper, lksb,
> fsdlm_blocking_ast_wrapper);
> return ret;
> }
>
> static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
> union ocfs2_dlm_lksb *lksb,
> - u32 flags,
> - void *astarg)
> + u32 flags)
> {
> int ret;
>
> ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
> - flags, &lksb->lksb_fsdlm, astarg);
> + flags, &lksb->lksb_fsdlm, lksb);
> return ret;
> }
>
> diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
> index f3df0ba..3500d98 100644
> --- a/fs/ocfs2/stackglue.c
> +++ b/fs/ocfs2/stackglue.c
> @@ -233,35 +233,32 @@ EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_locking_protocol);
>
>
> /*
> - * The ocfs2_dlm_lock() and ocfs2_dlm_unlock() functions take
> - * "struct ocfs2_lock_res *astarg" instead of "void *astarg" because the
> - * underlying stack plugins need to pilfer the lksb off of the lock_res.
> - * If some other structure needs to be passed as an astarg, the plugins
> - * will need to be given a different avenue to the lksb.
> + * The ocfs2_dlm_lock() and ocfs2_dlm_unlock() functions take no argument
> + * for the ast and bast functions. They will pass the lksb to the ast
> + * and bast. The caller can wrap the lksb with their own structure to
> + * get more information.
> */
> int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
> int mode,
> union ocfs2_dlm_lksb *lksb,
> u32 flags,
> void *name,
> - unsigned int namelen,
> - struct ocfs2_lock_res *astarg)
> + unsigned int namelen)
> {
> BUG_ON(lproto == NULL);
>
> return active_stack->sp_ops->dlm_lock(conn, mode, lksb, flags,
> - name, namelen, astarg);
> + name, namelen);
> }
> EXPORT_SYMBOL_GPL(ocfs2_dlm_lock);
>
> int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
> union ocfs2_dlm_lksb *lksb,
> - u32 flags,
> - struct ocfs2_lock_res *astarg)
> + u32 flags)
> {
> BUG_ON(lproto == NULL);
>
> - return active_stack->sp_ops->dlm_unlock(conn, lksb, flags, astarg);
> + return active_stack->sp_ops->dlm_unlock(conn, lksb, flags);
> }
> EXPORT_SYMBOL_GPL(ocfs2_dlm_unlock);
>
> diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
> index 03a44d6..d699117 100644
> --- a/fs/ocfs2/stackglue.h
> +++ b/fs/ocfs2/stackglue.h
> @@ -56,17 +56,6 @@ struct ocfs2_protocol_version {
> };
>
> /*
> - * The ocfs2_locking_protocol defines the handlers called on ocfs2's behalf.
> - */
> -struct ocfs2_locking_protocol {
> - struct ocfs2_protocol_version lp_max_version;
> - void (*lp_lock_ast)(void *astarg);
> - void (*lp_blocking_ast)(void *astarg, int level);
> - void (*lp_unlock_ast)(void *astarg, int error);
> -};
> -
> -
> -/*
> * The dlm_lockstatus struct includes lvb space, but the dlm_lksb struct only
> * has a pointer to separately allocated lvb space. This struct exists only to
> * include in the lksb union to make space for a combined dlm_lksb and lvb.
> @@ -88,6 +77,17 @@ union ocfs2_dlm_lksb {
> };
>
> /*
> + * The ocfs2_locking_protocol defines the handlers called on ocfs2's behalf.
> + */
> +struct ocfs2_locking_protocol {
> + struct ocfs2_protocol_version lp_max_version;
> + void (*lp_lock_ast)(union ocfs2_dlm_lksb *lksb);
> + void (*lp_blocking_ast)(union ocfs2_dlm_lksb *lksb, int level);
> + void (*lp_unlock_ast)(union ocfs2_dlm_lksb *lksb, int error);
> +};
> +
> +
> +/*
> * A cluster connection. Mostly opaque to ocfs2, the connection holds
> * state for the underlying stack. ocfs2 does use cc_version to determine
> * locking compatibility.
> @@ -155,27 +155,29 @@ struct ocfs2_stack_operations {
> *
> * ast and bast functions are not part of the call because the
> * stack will likely want to wrap ast and bast calls before passing
> - * them to stack->sp_proto.
> + * them to stack->sp_proto. There is no astarg. The lksb will
> + * be passed back to the ast and bast functions. The caller can
> + * use this to find their object.
> */
> int (*dlm_lock)(struct ocfs2_cluster_connection *conn,
> int mode,
> union ocfs2_dlm_lksb *lksb,
> u32 flags,
> void *name,
> - unsigned int namelen,
> - void *astarg);
> + unsigned int namelen);
>
> /*
> * Call the underlying dlm unlock function. The ->dlm_unlock()
> * function should convert the flags as appropriate.
> *
> * The unlock ast is not passed, as the stack will want to wrap
> - * it before calling stack->sp_proto->lp_unlock_ast().
> + * it before calling stack->sp_proto->lp_unlock_ast(). There is
> + * no astarg. The lksb will be passed back to the unlock ast
> + * function. The caller can use this to find their object.
> */
> int (*dlm_unlock)(struct ocfs2_cluster_connection *conn,
> union ocfs2_dlm_lksb *lksb,
> - u32 flags,
> - void *astarg);
> + u32 flags);
>
> /*
> * Return the status of the current lock status block. The fs
> @@ -249,12 +251,10 @@ int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
> union ocfs2_dlm_lksb *lksb,
> u32 flags,
> void *name,
> - unsigned int namelen,
> - struct ocfs2_lock_res *astarg);
> + unsigned int namelen);
> int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
> union ocfs2_dlm_lksb *lksb,
> - u32 flags,
> - struct ocfs2_lock_res *astarg);
> + u32 flags);
>
> int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb);
> int ocfs2_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb);
>

2010-02-12 23:58:40

by Sunil Mushran

[permalink] [raw]
Subject: Re: [Ocfs2-devel] [PATCH 05/11] ocfs2: Attach the connection to the lksb

Acked-by: Sunil Mushran <[email protected]>

Joel Becker wrote:
> We're going to want it in the ast functions, so we convert union
> ocfs2_dlm_lksb to struct ocfs2_dlm_lksb and let it carry the connection.
>
> Signed-off-by: Joel Becker <[email protected]>
> ---
> fs/ocfs2/dlmglue.c | 8 ++++----
> fs/ocfs2/ocfs2.h | 2 +-
> fs/ocfs2/stack_o2cb.c | 18 +++++++++---------
> fs/ocfs2/stack_user.c | 16 ++++++++--------
> fs/ocfs2/stackglue.c | 17 +++++++++++------
> fs/ocfs2/stackglue.h | 42 +++++++++++++++++++++++-------------------
> 6 files changed, 56 insertions(+), 47 deletions(-)
>
> diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
> index 28df5f7..4cb3ac2 100644
> --- a/fs/ocfs2/dlmglue.c
> +++ b/fs/ocfs2/dlmglue.c
> @@ -297,7 +297,7 @@ static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
> lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
> }
>
> -static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(union ocfs2_dlm_lksb *lksb)
> +static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
> {
> return container_of(lksb, struct ocfs2_lock_res, l_lksb);
> }
> @@ -1037,7 +1037,7 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
> }
>
>
> -static void ocfs2_blocking_ast(union ocfs2_dlm_lksb *lksb, int level)
> +static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
> {
> struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
> struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
> @@ -1068,7 +1068,7 @@ static void ocfs2_blocking_ast(union ocfs2_dlm_lksb *lksb, int level)
> ocfs2_wake_downconvert_thread(osb);
> }
>
> -static void ocfs2_locking_ast(union ocfs2_dlm_lksb *lksb)
> +static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
> {
> struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
> struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
> @@ -3026,7 +3026,7 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
> mlog_exit_void();
> }
>
> -static void ocfs2_unlock_ast(union ocfs2_dlm_lksb *lksb, int error)
> +static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
> {
> struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
> unsigned long flags;
> diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
> index 9362eea..5413f6c 100644
> --- a/fs/ocfs2/ocfs2.h
> +++ b/fs/ocfs2/ocfs2.h
> @@ -155,7 +155,7 @@ struct ocfs2_lock_res {
> int l_level;
> unsigned int l_ro_holders;
> unsigned int l_ex_holders;
> - union ocfs2_dlm_lksb l_lksb;
> + struct ocfs2_dlm_lksb l_lksb;
>
> /* used from AST/BAST funcs. */
> enum ocfs2_ast_action l_action;
> diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c
> index e26a789..41e6bad 100644
> --- a/fs/ocfs2/stack_o2cb.c
> +++ b/fs/ocfs2/stack_o2cb.c
> @@ -161,7 +161,7 @@ static int dlm_status_to_errno(enum dlm_status status)
>
> static void o2dlm_lock_ast_wrapper(void *astarg)
> {
> - union ocfs2_dlm_lksb *lksb = astarg;
> + struct ocfs2_dlm_lksb *lksb = astarg;
>
> BUG_ON(o2cb_stack.sp_proto == NULL);
>
> @@ -170,7 +170,7 @@ static void o2dlm_lock_ast_wrapper(void *astarg)
>
> static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
> {
> - union ocfs2_dlm_lksb *lksb = astarg;
> + struct ocfs2_dlm_lksb *lksb = astarg;
>
> BUG_ON(o2cb_stack.sp_proto == NULL);
>
> @@ -179,7 +179,7 @@ static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
>
> static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
> {
> - union ocfs2_dlm_lksb *lksb = astarg;
> + struct ocfs2_dlm_lksb *lksb = astarg;
>
> int error = dlm_status_to_errno(status);
>
> @@ -204,7 +204,7 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
>
> static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
> int mode,
> - union ocfs2_dlm_lksb *lksb,
> + struct ocfs2_dlm_lksb *lksb,
> u32 flags,
> void *name,
> unsigned int namelen)
> @@ -223,7 +223,7 @@ static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
> }
>
> static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn,
> - union ocfs2_dlm_lksb *lksb,
> + struct ocfs2_dlm_lksb *lksb,
> u32 flags)
> {
> enum dlm_status status;
> @@ -236,7 +236,7 @@ static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn,
> return ret;
> }
>
> -static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
> +static int o2cb_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
> {
> return dlm_status_to_errno(lksb->lksb_o2dlm.status);
> }
> @@ -246,17 +246,17 @@ static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
> * contents, it will zero out the LVB. Thus the caller can always trust
> * the contents.
> */
> -static int o2cb_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb)
> +static int o2cb_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
> {
> return 1;
> }
>
> -static void *o2cb_dlm_lvb(union ocfs2_dlm_lksb *lksb)
> +static void *o2cb_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
> {
> return (void *)(lksb->lksb_o2dlm.lvb);
> }
>
> -static void o2cb_dump_lksb(union ocfs2_dlm_lksb *lksb)
> +static void o2cb_dump_lksb(struct ocfs2_dlm_lksb *lksb)
> {
> dlm_print_one_lock(lksb->lksb_o2dlm.lockid);
> }
> diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
> index 129b931..31276ba 100644
> --- a/fs/ocfs2/stack_user.c
> +++ b/fs/ocfs2/stack_user.c
> @@ -665,7 +665,7 @@ static void ocfs2_control_exit(void)
>
> static void fsdlm_lock_ast_wrapper(void *astarg)
> {
> - union ocfs2_dlm_lksb *lksb = astarg;
> + struct ocfs2_dlm_lksb *lksb = astarg;
> int status = lksb->lksb_fsdlm.sb_status;
>
> BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
> @@ -688,7 +688,7 @@ static void fsdlm_lock_ast_wrapper(void *astarg)
>
> static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
> {
> - union ocfs2_dlm_lksb *lksb = astarg;
> + struct ocfs2_dlm_lksb *lksb = astarg;
>
> BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
>
> @@ -697,7 +697,7 @@ static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
>
> static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
> int mode,
> - union ocfs2_dlm_lksb *lksb,
> + struct ocfs2_dlm_lksb *lksb,
> u32 flags,
> void *name,
> unsigned int namelen)
> @@ -716,7 +716,7 @@ static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
> }
>
> static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
> - union ocfs2_dlm_lksb *lksb,
> + struct ocfs2_dlm_lksb *lksb,
> u32 flags)
> {
> int ret;
> @@ -726,19 +726,19 @@ static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
> return ret;
> }
>
> -static int user_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
> +static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
> {
> return lksb->lksb_fsdlm.sb_status;
> }
>
> -static int user_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb)
> +static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
> {
> int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;
>
> return !invalid;
> }
>
> -static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb)
> +static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
> {
> if (!lksb->lksb_fsdlm.sb_lvbptr)
> lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
> @@ -746,7 +746,7 @@ static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb)
> return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
> }
>
> -static void user_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
> +static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
> {
> }
>
> diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
> index 3500d98..8ef9a57 100644
> --- a/fs/ocfs2/stackglue.c
> +++ b/fs/ocfs2/stackglue.c
> @@ -240,47 +240,52 @@ EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_locking_protocol);
> */
> int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
> int mode,
> - union ocfs2_dlm_lksb *lksb,
> + struct ocfs2_dlm_lksb *lksb,
> u32 flags,
> void *name,
> unsigned int namelen)
> {
> BUG_ON(lproto == NULL);
>
> + if (!lksb->lksb_conn)
> + lksb->lksb_conn = conn;
> + else
> + BUG_ON(lksb->lksb_conn != conn);
> return active_stack->sp_ops->dlm_lock(conn, mode, lksb, flags,
> name, namelen);
> }
> EXPORT_SYMBOL_GPL(ocfs2_dlm_lock);
>
> int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
> - union ocfs2_dlm_lksb *lksb,
> + struct ocfs2_dlm_lksb *lksb,
> u32 flags)
> {
> BUG_ON(lproto == NULL);
> + BUG_ON(lksb->lksb_conn == NULL);
>
> return active_stack->sp_ops->dlm_unlock(conn, lksb, flags);
> }
> EXPORT_SYMBOL_GPL(ocfs2_dlm_unlock);
>
> -int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
> +int ocfs2_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
> {
> return active_stack->sp_ops->lock_status(lksb);
> }
> EXPORT_SYMBOL_GPL(ocfs2_dlm_lock_status);
>
> -int ocfs2_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb)
> +int ocfs2_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
> {
> return active_stack->sp_ops->lvb_valid(lksb);
> }
> EXPORT_SYMBOL_GPL(ocfs2_dlm_lvb_valid);
>
> -void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb)
> +void *ocfs2_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
> {
> return active_stack->sp_ops->lock_lvb(lksb);
> }
> EXPORT_SYMBOL_GPL(ocfs2_dlm_lvb);
>
> -void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
> +void ocfs2_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
> {
> active_stack->sp_ops->dump_lksb(lksb);
> }
> diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
> index d699117..bb32926 100644
> --- a/fs/ocfs2/stackglue.h
> +++ b/fs/ocfs2/stackglue.h
> @@ -70,10 +70,14 @@ struct fsdlm_lksb_plus_lvb {
> * size of the union is known. Lock status structures are embedded in
> * ocfs2 inodes.
> */
> -union ocfs2_dlm_lksb {
> - struct dlm_lockstatus lksb_o2dlm;
> - struct dlm_lksb lksb_fsdlm;
> - struct fsdlm_lksb_plus_lvb padding;
> +struct ocfs2_cluster_connection;
> +struct ocfs2_dlm_lksb {
> + union {
> + struct dlm_lockstatus lksb_o2dlm;
> + struct dlm_lksb lksb_fsdlm;
> + struct fsdlm_lksb_plus_lvb padding;
> + };
> + struct ocfs2_cluster_connection *lksb_conn;
> };
>
> /*
> @@ -81,9 +85,9 @@ union ocfs2_dlm_lksb {
> */
> struct ocfs2_locking_protocol {
> struct ocfs2_protocol_version lp_max_version;
> - void (*lp_lock_ast)(union ocfs2_dlm_lksb *lksb);
> - void (*lp_blocking_ast)(union ocfs2_dlm_lksb *lksb, int level);
> - void (*lp_unlock_ast)(union ocfs2_dlm_lksb *lksb, int error);
> + void (*lp_lock_ast)(struct ocfs2_dlm_lksb *lksb);
> + void (*lp_blocking_ast)(struct ocfs2_dlm_lksb *lksb, int level);
> + void (*lp_unlock_ast)(struct ocfs2_dlm_lksb *lksb, int error);
> };
>
>
> @@ -161,7 +165,7 @@ struct ocfs2_stack_operations {
> */
> int (*dlm_lock)(struct ocfs2_cluster_connection *conn,
> int mode,
> - union ocfs2_dlm_lksb *lksb,
> + struct ocfs2_dlm_lksb *lksb,
> u32 flags,
> void *name,
> unsigned int namelen);
> @@ -176,7 +180,7 @@ struct ocfs2_stack_operations {
> * function. The caller can use this to find their object.
> */
> int (*dlm_unlock)(struct ocfs2_cluster_connection *conn,
> - union ocfs2_dlm_lksb *lksb,
> + struct ocfs2_dlm_lksb *lksb,
> u32 flags);
>
> /*
> @@ -185,17 +189,17 @@ struct ocfs2_stack_operations {
> * callback pulls out the stack-specific lksb, converts the status
> * to a proper errno, and returns it.
> */
> - int (*lock_status)(union ocfs2_dlm_lksb *lksb);
> + int (*lock_status)(struct ocfs2_dlm_lksb *lksb);
>
> /*
> * Return non-zero if the LVB is valid.
> */
> - int (*lvb_valid)(union ocfs2_dlm_lksb *lksb);
> + int (*lvb_valid)(struct ocfs2_dlm_lksb *lksb);
>
> /*
> * Pull the lvb pointer off of the stack-specific lksb.
> */
> - void *(*lock_lvb)(union ocfs2_dlm_lksb *lksb);
> + void *(*lock_lvb)(struct ocfs2_dlm_lksb *lksb);
>
> /*
> * Cluster-aware posix locks
> @@ -212,7 +216,7 @@ struct ocfs2_stack_operations {
> * This is an optoinal debugging hook. If provided, the
> * stack can dump debugging information about this lock.
> */
> - void (*dump_lksb)(union ocfs2_dlm_lksb *lksb);
> + void (*dump_lksb)(struct ocfs2_dlm_lksb *lksb);
> };
>
> /*
> @@ -248,18 +252,18 @@ int ocfs2_cluster_this_node(unsigned int *node);
> struct ocfs2_lock_res;
> int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
> int mode,
> - union ocfs2_dlm_lksb *lksb,
> + struct ocfs2_dlm_lksb *lksb,
> u32 flags,
> void *name,
> unsigned int namelen);
> int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
> - union ocfs2_dlm_lksb *lksb,
> + struct ocfs2_dlm_lksb *lksb,
> u32 flags);
>
> -int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb);
> -int ocfs2_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb);
> -void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb);
> -void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb);
> +int ocfs2_dlm_lock_status(struct ocfs2_dlm_lksb *lksb);
> +int ocfs2_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb);
> +void *ocfs2_dlm_lvb(struct ocfs2_dlm_lksb *lksb);
> +void ocfs2_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb);
>
> int ocfs2_stack_supports_plocks(void);
> int ocfs2_plock(struct ocfs2_cluster_connection *conn, u64 ino,
>

2010-02-13 00:01:37

by Sunil Mushran

[permalink] [raw]
Subject: Re: [Ocfs2-devel] [PATCH 06/11] ocfs2: Hang the locking proto on the cluster conn and use it in asts.

Acked-by: Sunil Mushran <[email protected]>

Joel Becker wrote:
> With the ocfs2_cluster_connection hanging off of the ocfs2_dlm_lksb, we
> have access to it in the ast and bast wrapper functions. Attach the
> ocfs2_locking_protocol to the conn.
>
> Now, instead of refering to a static variable for ast/bast pointers, the
> wrappers can look at the connection. This means different connections
> can have different ast/bast pointers, and it reduces the need for the
> static pointer.
>
> Signed-off-by: Joel Becker <[email protected]>
> ---
> fs/ocfs2/stack_o2cb.c | 15 ++++-----------
> fs/ocfs2/stack_user.c | 10 +++-------
> fs/ocfs2/stackglue.c | 1 +
> fs/ocfs2/stackglue.h | 1 +
> 4 files changed, 9 insertions(+), 18 deletions(-)
>
> diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c
> index 41e6bad..abf696a 100644
> --- a/fs/ocfs2/stack_o2cb.c
> +++ b/fs/ocfs2/stack_o2cb.c
> @@ -163,28 +163,21 @@ static void o2dlm_lock_ast_wrapper(void *astarg)
> {
> struct ocfs2_dlm_lksb *lksb = astarg;
>
> - BUG_ON(o2cb_stack.sp_proto == NULL);
> -
> - o2cb_stack.sp_proto->lp_lock_ast(lksb);
> + lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
> }
>
> static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
> {
> struct ocfs2_dlm_lksb *lksb = astarg;
>
> - BUG_ON(o2cb_stack.sp_proto == NULL);
> -
> - o2cb_stack.sp_proto->lp_blocking_ast(lksb, level);
> + lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
> }
>
> static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
> {
> struct ocfs2_dlm_lksb *lksb = astarg;
> -
> int error = dlm_status_to_errno(status);
>
> - BUG_ON(o2cb_stack.sp_proto == NULL);
> -
> /*
> * In o2dlm, you can get both the lock_ast() for the lock being
> * granted and the unlock_ast() for the CANCEL failing. A
> @@ -199,7 +192,7 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
> if (status == DLM_CANCELGRANT)
> return;
>
> - o2cb_stack.sp_proto->lp_unlock_ast(lksb, error);
> + lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, error);
> }
>
> static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
> @@ -284,7 +277,7 @@ static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn)
> struct dlm_protocol_version dlm_version;
>
> BUG_ON(conn == NULL);
> - BUG_ON(o2cb_stack.sp_proto == NULL);
> + BUG_ON(conn->cc_proto == NULL);
>
> /* for now we only have one cluster/node, make sure we see it
> * in the heartbeat universe */
> diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
> index 31276ba..b4cf616 100644
> --- a/fs/ocfs2/stack_user.c
> +++ b/fs/ocfs2/stack_user.c
> @@ -668,8 +668,6 @@ static void fsdlm_lock_ast_wrapper(void *astarg)
> struct ocfs2_dlm_lksb *lksb = astarg;
> int status = lksb->lksb_fsdlm.sb_status;
>
> - BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
> -
> /*
> * For now we're punting on the issue of other non-standard errors
> * where we can't tell if the unlock_ast or lock_ast should be called.
> @@ -681,18 +679,16 @@ static void fsdlm_lock_ast_wrapper(void *astarg)
> */
>
> if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
> - ocfs2_user_plugin.sp_proto->lp_unlock_ast(lksb, 0);
> + lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
> else
> - ocfs2_user_plugin.sp_proto->lp_lock_ast(lksb);
> + lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
> }
>
> static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
> {
> struct ocfs2_dlm_lksb *lksb = astarg;
>
> - BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
> -
> - ocfs2_user_plugin.sp_proto->lp_blocking_ast(lksb, level);
> + lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
> }
>
> static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
> diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
> index 8ef9a57..010ecab 100644
> --- a/fs/ocfs2/stackglue.c
> +++ b/fs/ocfs2/stackglue.c
> @@ -343,6 +343,7 @@ int ocfs2_cluster_connect(const char *stack_name,
> new_conn->cc_recovery_handler = recovery_handler;
> new_conn->cc_recovery_data = recovery_data;
>
> + new_conn->cc_proto = lproto;
> /* Start the new connection at our maximum compatibility level */
> new_conn->cc_version = lproto->lp_max_version;
>
> diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
> index bb32926..cf8bac2 100644
> --- a/fs/ocfs2/stackglue.h
> +++ b/fs/ocfs2/stackglue.h
> @@ -100,6 +100,7 @@ struct ocfs2_cluster_connection {
> char cc_name[GROUP_NAME_MAX];
> int cc_namelen;
> struct ocfs2_protocol_version cc_version;
> + struct ocfs2_locking_protocol *cc_proto;
> void (*cc_recovery_handler)(int node_num, void *recovery_data);
> void *cc_recovery_data;
> void *cc_lockspace;
>

2010-02-13 01:11:51

by Sunil Mushran

[permalink] [raw]
Subject: Re: [Ocfs2-devel] [PATCH 07/11] ocfs2: Remove the ast pointers from ocfs2_stack_plugins

Acked-by: Sunil Mushran <[email protected]>

Joel Becker wrote:
> With the full ocfs2_locking_protocol hanging off of the
> ocfs2_cluster_connection, ast wrappers can get the ast/bast pointers
> there. They don't need to get them from their plugin structure.
>
> The user plugin still needs the maximum locking protocol version,
> though. This changes the plugin structure so that it only holds the max
> version, not the entire ocfs2_locking_protocol pointer.
>
> Signed-off-by: Joel Becker <[email protected]>
> ---
> fs/ocfs2/stack_user.c | 6 +++---
> fs/ocfs2/stackglue.c | 4 ++--
> fs/ocfs2/stackglue.h | 2 +-
> 3 files changed, 6 insertions(+), 6 deletions(-)
>
> diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
> index b4cf616..5ae8812 100644
> --- a/fs/ocfs2/stack_user.c
> +++ b/fs/ocfs2/stack_user.c
> @@ -62,8 +62,8 @@
> * negotiated by the client. The client negotiates based on the maximum
> * version advertised in /sys/fs/ocfs2/max_locking_protocol. The major
> * number from the "SETV" message must match
> - * ocfs2_user_plugin.sp_proto->lp_max_version.pv_major, and the minor number
> - * must be less than or equal to ...->lp_max_version.pv_minor.
> + * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
> + * must be less than or equal to ...sp_max_version.pv_minor.
> *
> * Once this information has been set, mounts will be allowed. From this
> * point on, the "DOWN" message can be sent for node down notification.
> @@ -400,7 +400,7 @@ static int ocfs2_control_do_setversion_msg(struct file *file,
> char *ptr = NULL;
> struct ocfs2_control_private *p = file->private_data;
> struct ocfs2_protocol_version *max =
> - &ocfs2_user_plugin.sp_proto->lp_max_version;
> + &ocfs2_user_plugin.sp_max_proto;
>
> if (ocfs2_control_get_handshake_state(file) !=
> OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
> diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
> index 010ecab..fc184c7 100644
> --- a/fs/ocfs2/stackglue.c
> +++ b/fs/ocfs2/stackglue.c
> @@ -176,7 +176,7 @@ int ocfs2_stack_glue_register(struct ocfs2_stack_plugin *plugin)
> spin_lock(&ocfs2_stack_lock);
> if (!ocfs2_stack_lookup(plugin->sp_name)) {
> plugin->sp_count = 0;
> - plugin->sp_proto = lproto;
> + plugin->sp_max_proto = lproto->lp_max_version;
> list_add(&plugin->sp_list, &ocfs2_stack_list);
> printk(KERN_INFO "ocfs2: Registered cluster interface %s\n",
> plugin->sp_name);
> @@ -224,7 +224,7 @@ void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto)
>
> lproto = proto;
> list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
> - p->sp_proto = lproto;
> + p->sp_max_proto = lproto->lp_max_version;
> }
>
> spin_unlock(&ocfs2_stack_lock);
> diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
> index cf8bac2..77a7a9a 100644
> --- a/fs/ocfs2/stackglue.h
> +++ b/fs/ocfs2/stackglue.h
> @@ -233,7 +233,7 @@ struct ocfs2_stack_plugin {
> /* These are managed by the stackglue code. */
> struct list_head sp_list;
> unsigned int sp_count;
> - struct ocfs2_locking_protocol *sp_proto;
> + struct ocfs2_protocol_version sp_max_proto;
> };
>
>
>

2010-02-27 00:10:37

by Sunil Mushran

[permalink] [raw]
Subject: Re: [Ocfs2-devel] [PATCH 08/11] ocfs2: Pass the locking protocol into ocfs2_cluster_connect().

Signed-off-by: Sunil Mushran <[email protected]>


Joel Becker wrote:
> Inside the stackglue, the locking protocol structure is hanging off of
> the ocfs2_cluster_connection. This takes it one further; the locking
> protocol is passed into ocfs2_cluster_connect(). Now different cluster
> connections can have different locking protocols with distinct asts.
> Note that all locking protocols have to keep their maximum protocol
> version in lock-step.
>
> With the protocol structure set in ocfs2_cluster_connect(), there is no
> need for the stackglue to have a static pointer to a specific protocol
> structure. We can change initialization to only pass in the maximum
> protocol version.
>
> Signed-off-by: Joel Becker <[email protected]>
> ---
> fs/ocfs2/dlmglue.c | 168 +++++++++++++++++++++++++-------------------------
> fs/ocfs2/stackglue.c | 43 ++++++++-----
> fs/ocfs2/stackglue.h | 3 +-
> 3 files changed, 110 insertions(+), 104 deletions(-)
>
> diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
> index 4cb3ac2..dde55c4 100644
> --- a/fs/ocfs2/dlmglue.c
> +++ b/fs/ocfs2/dlmglue.c
> @@ -1036,7 +1036,6 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
> return lockres->l_pending_gen;
> }
>
> -
> static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
> {
> struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
> @@ -1130,6 +1129,88 @@ out:
> spin_unlock_irqrestore(&lockres->l_lock, flags);
> }
>
> +static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
> +{
> + struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
> + unsigned long flags;
> +
> + mlog_entry_void();
> +
> + mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
> + lockres->l_unlock_action);
> +
> + spin_lock_irqsave(&lockres->l_lock, flags);
> + if (error) {
> + mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
> + "unlock_action %d\n", error, lockres->l_name,
> + lockres->l_unlock_action);
> + spin_unlock_irqrestore(&lockres->l_lock, flags);
> + mlog_exit_void();
> + return;
> + }
> +
> + switch(lockres->l_unlock_action) {
> + case OCFS2_UNLOCK_CANCEL_CONVERT:
> + mlog(0, "Cancel convert success for %s\n", lockres->l_name);
> + lockres->l_action = OCFS2_AST_INVALID;
> + /* Downconvert thread may have requeued this lock, we
> + * need to wake it. */
> + if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
> + ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
> + break;
> + case OCFS2_UNLOCK_DROP_LOCK:
> + lockres->l_level = DLM_LOCK_IV;
> + break;
> + default:
> + BUG();
> + }
> +
> + lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
> + lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
> + wake_up(&lockres->l_event);
> + spin_unlock_irqrestore(&lockres->l_lock, flags);
> +
> + mlog_exit_void();
> +}
> +
> +/*
> + * This is the filesystem locking protocol. It provides the lock handling
> + * hooks for the underlying DLM. It has a maximum version number.
> + * The version number allows interoperability with systems running at
> + * the same major number and an equal or smaller minor number.
> + *
> + * Whenever the filesystem does new things with locks (adds or removes a
> + * lock, orders them differently, does different things underneath a lock),
> + * the version must be changed. The protocol is negotiated when joining
> + * the dlm domain. A node may join the domain if its major version is
> + * identical to all other nodes and its minor version is greater than
> + * or equal to all other nodes. When its minor version is greater than
> + * the other nodes, it will run at the minor version specified by the
> + * other nodes.
> + *
> + * If a locking change is made that will not be compatible with older
> + * versions, the major number must be increased and the minor version set
> + * to zero. If a change merely adds a behavior that can be disabled when
> + * speaking to older versions, the minor version must be increased. If a
> + * change adds a fully backwards compatible change (eg, LVB changes that
> + * are just ignored by older versions), the version does not need to be
> + * updated.
> + */
> +static struct ocfs2_locking_protocol lproto = {
> + .lp_max_version = {
> + .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
> + .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
> + },
> + .lp_lock_ast = ocfs2_locking_ast,
> + .lp_blocking_ast = ocfs2_blocking_ast,
> + .lp_unlock_ast = ocfs2_unlock_ast,
> +};
> +
> +void ocfs2_set_locking_protocol(void)
> +{
> + ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
> +}
> +
> static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
> int convert)
> {
> @@ -2959,7 +3040,7 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
> status = ocfs2_cluster_connect(osb->osb_cluster_stack,
> osb->uuid_str,
> strlen(osb->uuid_str),
> - ocfs2_do_node_down, osb,
> + &lproto, ocfs2_do_node_down, osb,
> &conn);
> if (status) {
> mlog_errno(status);
> @@ -3026,50 +3107,6 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
> mlog_exit_void();
> }
>
> -static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
> -{
> - struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
> - unsigned long flags;
> -
> - mlog_entry_void();
> -
> - mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
> - lockres->l_unlock_action);
> -
> - spin_lock_irqsave(&lockres->l_lock, flags);
> - if (error) {
> - mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
> - "unlock_action %d\n", error, lockres->l_name,
> - lockres->l_unlock_action);
> - spin_unlock_irqrestore(&lockres->l_lock, flags);
> - mlog_exit_void();
> - return;
> - }
> -
> - switch(lockres->l_unlock_action) {
> - case OCFS2_UNLOCK_CANCEL_CONVERT:
> - mlog(0, "Cancel convert success for %s\n", lockres->l_name);
> - lockres->l_action = OCFS2_AST_INVALID;
> - /* Downconvert thread may have requeued this lock, we
> - * need to wake it. */
> - if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
> - ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
> - break;
> - case OCFS2_UNLOCK_DROP_LOCK:
> - lockres->l_level = DLM_LOCK_IV;
> - break;
> - default:
> - BUG();
> - }
> -
> - lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
> - lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
> - wake_up(&lockres->l_event);
> - spin_unlock_irqrestore(&lockres->l_lock, flags);
> -
> - mlog_exit_void();
> -}
> -
> static int ocfs2_drop_lock(struct ocfs2_super *osb,
> struct ocfs2_lock_res *lockres)
> {
> @@ -3843,45 +3880,6 @@ void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
> ocfs2_cluster_unlock(osb, lockres, level);
> }
>
> -/*
> - * This is the filesystem locking protocol. It provides the lock handling
> - * hooks for the underlying DLM. It has a maximum version number.
> - * The version number allows interoperability with systems running at
> - * the same major number and an equal or smaller minor number.
> - *
> - * Whenever the filesystem does new things with locks (adds or removes a
> - * lock, orders them differently, does different things underneath a lock),
> - * the version must be changed. The protocol is negotiated when joining
> - * the dlm domain. A node may join the domain if its major version is
> - * identical to all other nodes and its minor version is greater than
> - * or equal to all other nodes. When its minor version is greater than
> - * the other nodes, it will run at the minor version specified by the
> - * other nodes.
> - *
> - * If a locking change is made that will not be compatible with older
> - * versions, the major number must be increased and the minor version set
> - * to zero. If a change merely adds a behavior that can be disabled when
> - * speaking to older versions, the minor version must be increased. If a
> - * change adds a fully backwards compatible change (eg, LVB changes that
> - * are just ignored by older versions), the version does not need to be
> - * updated.
> - */
> -static struct ocfs2_locking_protocol lproto = {
> - .lp_max_version = {
> - .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
> - .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
> - },
> - .lp_lock_ast = ocfs2_locking_ast,
> - .lp_blocking_ast = ocfs2_blocking_ast,
> - .lp_unlock_ast = ocfs2_unlock_ast,
> -};
> -
> -void ocfs2_set_locking_protocol(void)
> -{
> - ocfs2_stack_glue_set_locking_protocol(&lproto);
> -}
> -
> -
> static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
> struct ocfs2_lock_res *lockres)
> {
> diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
> index fc184c7..31db2e8 100644
> --- a/fs/ocfs2/stackglue.c
> +++ b/fs/ocfs2/stackglue.c
> @@ -36,7 +36,7 @@
> #define OCFS2_STACK_PLUGIN_USER "user"
> #define OCFS2_MAX_HB_CTL_PATH 256
>
> -static struct ocfs2_locking_protocol *lproto;
> +static struct ocfs2_protocol_version locking_max_version;
> static DEFINE_SPINLOCK(ocfs2_stack_lock);
> static LIST_HEAD(ocfs2_stack_list);
> static char cluster_stack_name[OCFS2_STACK_LABEL_LEN + 1];
> @@ -176,7 +176,7 @@ int ocfs2_stack_glue_register(struct ocfs2_stack_plugin *plugin)
> spin_lock(&ocfs2_stack_lock);
> if (!ocfs2_stack_lookup(plugin->sp_name)) {
> plugin->sp_count = 0;
> - plugin->sp_max_proto = lproto->lp_max_version;
> + plugin->sp_max_proto = locking_max_version;
> list_add(&plugin->sp_list, &ocfs2_stack_list);
> printk(KERN_INFO "ocfs2: Registered cluster interface %s\n",
> plugin->sp_name);
> @@ -213,23 +213,23 @@ void ocfs2_stack_glue_unregister(struct ocfs2_stack_plugin *plugin)
> }
> EXPORT_SYMBOL_GPL(ocfs2_stack_glue_unregister);
>
> -void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto)
> +void ocfs2_stack_glue_set_max_proto_version(struct ocfs2_protocol_version *max_proto)
> {
> struct ocfs2_stack_plugin *p;
>
> - BUG_ON(proto == NULL);
> -
> spin_lock(&ocfs2_stack_lock);
> - BUG_ON(active_stack != NULL);
> + if (memcmp(max_proto, &locking_max_version,
> + sizeof(struct ocfs2_protocol_version))) {
> + BUG_ON(locking_max_version.pv_major != 0);
>
> - lproto = proto;
> - list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
> - p->sp_max_proto = lproto->lp_max_version;
> + locking_max_version = *max_proto;
> + list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
> + p->sp_max_proto = locking_max_version;
> + }
> }
> -
> spin_unlock(&ocfs2_stack_lock);
> }
> -EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_locking_protocol);
> +EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_max_proto_version);
>
>
> /*
> @@ -245,8 +245,6 @@ int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
> void *name,
> unsigned int namelen)
> {
> - BUG_ON(lproto == NULL);
> -
> if (!lksb->lksb_conn)
> lksb->lksb_conn = conn;
> else
> @@ -260,7 +258,6 @@ int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
> struct ocfs2_dlm_lksb *lksb,
> u32 flags)
> {
> - BUG_ON(lproto == NULL);
> BUG_ON(lksb->lksb_conn == NULL);
>
> return active_stack->sp_ops->dlm_unlock(conn, lksb, flags);
> @@ -314,6 +311,7 @@ EXPORT_SYMBOL_GPL(ocfs2_plock);
> int ocfs2_cluster_connect(const char *stack_name,
> const char *group,
> int grouplen,
> + struct ocfs2_locking_protocol *lproto,
> void (*recovery_handler)(int node_num,
> void *recovery_data),
> void *recovery_data,
> @@ -331,6 +329,12 @@ int ocfs2_cluster_connect(const char *stack_name,
> goto out;
> }
>
> + if (memcmp(&lproto->lp_max_version, &locking_max_version,
> + sizeof(struct ocfs2_protocol_version))) {
> + rc = -EINVAL;
> + goto out;
> + }
> +
> new_conn = kzalloc(sizeof(struct ocfs2_cluster_connection),
> GFP_KERNEL);
> if (!new_conn) {
> @@ -456,10 +460,10 @@ static ssize_t ocfs2_max_locking_protocol_show(struct kobject *kobj,
> ssize_t ret = 0;
>
> spin_lock(&ocfs2_stack_lock);
> - if (lproto)
> + if (locking_max_version.pv_major)
> ret = snprintf(buf, PAGE_SIZE, "%u.%u\n",
> - lproto->lp_max_version.pv_major,
> - lproto->lp_max_version.pv_minor);
> + locking_max_version.pv_major,
> + locking_max_version.pv_minor);
> spin_unlock(&ocfs2_stack_lock);
>
> return ret;
> @@ -688,7 +692,10 @@ static int __init ocfs2_stack_glue_init(void)
>
> static void __exit ocfs2_stack_glue_exit(void)
> {
> - lproto = NULL;
> + memset(&locking_max_version, 0,
> + sizeof(struct ocfs2_protocol_version));
> + locking_max_version.pv_major = 0;
> + locking_max_version.pv_minor = 0;
> ocfs2_sysfs_exit();
> if (ocfs2_table_header)
> unregister_sysctl_table(ocfs2_table_header);
> diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
> index 77a7a9a..b1981ba 100644
> --- a/fs/ocfs2/stackglue.h
> +++ b/fs/ocfs2/stackglue.h
> @@ -241,6 +241,7 @@ struct ocfs2_stack_plugin {
> int ocfs2_cluster_connect(const char *stack_name,
> const char *group,
> int grouplen,
> + struct ocfs2_locking_protocol *lproto,
> void (*recovery_handler)(int node_num,
> void *recovery_data),
> void *recovery_data,
> @@ -270,7 +271,7 @@ int ocfs2_stack_supports_plocks(void);
> int ocfs2_plock(struct ocfs2_cluster_connection *conn, u64 ino,
> struct file *file, int cmd, struct file_lock *fl);
>
> -void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto);
> +void ocfs2_stack_glue_set_max_proto_version(struct ocfs2_protocol_version *max_proto);
>
>
> /* Used by stack plugins */
>

2010-02-27 00:12:11

by Sunil Mushran

[permalink] [raw]
Subject: Re: [Ocfs2-devel] [PATCH 09/11] ocfs2_dlmfs: Don't honor truncate. The size of a dlmfs file is LVB_LEN

Signed-off-by: Sunil Mushran <[email protected]>


Joel Becker wrote:
> We want folks using dlmfs to be able to use the LVB in places other than
> just write(2)/read(2). By ignoring truncate requests, we allow 'echo
> "contents" > /dlm/space/lockname' to work.
>
> Signed-off-by: Joel Becker <[email protected]>
> ---
> fs/ocfs2/dlmfs/dlmfs.c | 18 ++++++++++++++++++
> 1 files changed, 18 insertions(+), 0 deletions(-)
>
> diff --git a/fs/ocfs2/dlmfs/dlmfs.c b/fs/ocfs2/dlmfs/dlmfs.c
> index e21ce0e..13ac2bf 100644
> --- a/fs/ocfs2/dlmfs/dlmfs.c
> +++ b/fs/ocfs2/dlmfs/dlmfs.c
> @@ -220,6 +220,23 @@ static int dlmfs_file_release(struct inode *inode,
> return 0;
> }
>
> +/*
> + * We do ->setattr() just to override size changes. Our size is the size
> + * of the LVB and nothing else.
> + */
> +static int dlmfs_file_setattr(struct dentry *dentry, struct iattr *attr)
> +{
> + int error;
> + struct inode *inode = dentry->d_inode;
> +
> + attr->ia_valid &= ~ATTR_SIZE;
> + error = inode_change_ok(inode, attr);
> + if (!error)
> + error = inode_setattr(inode, attr);
> +
> + return error;
> +}
> +
> static unsigned int dlmfs_file_poll(struct file *file, poll_table *wait)
> {
> int event = 0;
> @@ -634,6 +651,7 @@ static const struct super_operations dlmfs_ops = {
>
> static const struct inode_operations dlmfs_file_inode_operations = {
> .getattr = simple_getattr,
> + .setattr = dlmfs_file_setattr,
> };
>
> static int dlmfs_get_sb(struct file_system_type *fs_type,
>

2010-02-27 00:28:27

by Sunil Mushran

[permalink] [raw]
Subject: Re: [Ocfs2-devel] [PATCH 10/11] ocfs2_dlmfs: Use the stackglue.

Signed-off-by: Sunil Mushranr <[email protected]>


Joel Becker wrote:
> Rather than directly using o2dlm, dlmfs can now use the stackglue. This
> allows it to use userspace cluster stacks and fs/dlm. This commit
> forces o2cb for now. A latter commit will bump the protocol version and
> allow non-o2cb stacks.
>
> This is one big sed, really. LKM_xxMODE becomes DLM_LOCK_xx. LKM_flag
> becomes DLM_LKF_flag.
>
> We also learn to check that the LVB is valid before reading it. Any DLM
> can lose the contents of the LVB during a complicated recovery. userdlm
> should be checking this. Now it does. dlmfs will return 0 from read(2)
> if the LVB was invalid.
>
> Signed-off-by: Joel Becker <[email protected]>
> ---
> fs/ocfs2/dlmfs/dlmfs.c | 57 ++++------
> fs/ocfs2/dlmfs/userdlm.c | 266 +++++++++++++++++++++++----------------------
> fs/ocfs2/dlmfs/userdlm.h | 16 ++--
> 3 files changed, 166 insertions(+), 173 deletions(-)
>
> diff --git a/fs/ocfs2/dlmfs/dlmfs.c b/fs/ocfs2/dlmfs/dlmfs.c
> index 13ac2bf..8697366 100644
> --- a/fs/ocfs2/dlmfs/dlmfs.c
> +++ b/fs/ocfs2/dlmfs/dlmfs.c
> @@ -47,21 +47,13 @@
>
> #include <asm/uaccess.h>
>
> -
> -#include "cluster/nodemanager.h"
> -#include "cluster/heartbeat.h"
> -#include "cluster/tcp.h"
> -
> -#include "dlm/dlmapi.h"
> -
> +#include "stackglue.h"
> #include "userdlm.h"
> -
> #include "dlmfsver.h"
>
> #define MLOG_MASK_PREFIX ML_DLMFS
> #include "cluster/masklog.h"
>
> -#include "ocfs2_lockingver.h"
>
> static const struct super_operations dlmfs_ops;
> static const struct file_operations dlmfs_file_operations;
> @@ -72,15 +64,6 @@ static struct kmem_cache *dlmfs_inode_cache;
>
> struct workqueue_struct *user_dlm_worker;
>
> -/*
> - * This is the userdlmfs locking protocol version.
> - *
> - * See fs/ocfs2/dlmglue.c for more details on locking versions.
> - */
> -static const struct dlm_protocol_version user_locking_protocol = {
> - .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
> - .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
> -};
>
>
> /*
> @@ -259,7 +242,7 @@ static ssize_t dlmfs_file_read(struct file *filp,
> loff_t *ppos)
> {
> int bytes_left;
> - ssize_t readlen;
> + ssize_t readlen, got;
> char *lvb_buf;
> struct inode *inode = filp->f_path.dentry->d_inode;
>
> @@ -285,9 +268,13 @@ static ssize_t dlmfs_file_read(struct file *filp,
> if (!lvb_buf)
> return -ENOMEM;
>
> - user_dlm_read_lvb(inode, lvb_buf, readlen);
> - bytes_left = __copy_to_user(buf, lvb_buf, readlen);
> - readlen -= bytes_left;
> + got = user_dlm_read_lvb(inode, lvb_buf, readlen);
> + if (got) {
> + BUG_ON(got != readlen);
> + bytes_left = __copy_to_user(buf, lvb_buf, readlen);
> + readlen -= bytes_left;
> + } else
> + readlen = 0;
>
> kfree(lvb_buf);
>
> @@ -346,7 +333,7 @@ static void dlmfs_init_once(void *foo)
> struct dlmfs_inode_private *ip =
> (struct dlmfs_inode_private *) foo;
>
> - ip->ip_dlm = NULL;
> + ip->ip_conn = NULL;
> ip->ip_parent = NULL;
>
> inode_init_once(&ip->ip_vfs_inode);
> @@ -388,14 +375,14 @@ static void dlmfs_clear_inode(struct inode *inode)
> goto clear_fields;
> }
>
> - mlog(0, "we're a directory, ip->ip_dlm = 0x%p\n", ip->ip_dlm);
> + mlog(0, "we're a directory, ip->ip_conn = 0x%p\n", ip->ip_conn);
> /* we must be a directory. If required, lets unregister the
> * dlm context now. */
> - if (ip->ip_dlm)
> - user_dlm_unregister_context(ip->ip_dlm);
> + if (ip->ip_conn)
> + user_dlm_unregister(ip->ip_conn);
> clear_fields:
> ip->ip_parent = NULL;
> - ip->ip_dlm = NULL;
> + ip->ip_conn = NULL;
> }
>
> static struct backing_dev_info dlmfs_backing_dev_info = {
> @@ -445,7 +432,7 @@ static struct inode *dlmfs_get_inode(struct inode *parent,
> inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
>
> ip = DLMFS_I(inode);
> - ip->ip_dlm = DLMFS_I(parent)->ip_dlm;
> + ip->ip_conn = DLMFS_I(parent)->ip_conn;
>
> switch (mode & S_IFMT) {
> default:
> @@ -499,13 +486,12 @@ static int dlmfs_mkdir(struct inode * dir,
> struct inode *inode = NULL;
> struct qstr *domain = &dentry->d_name;
> struct dlmfs_inode_private *ip;
> - struct dlm_ctxt *dlm;
> - struct dlm_protocol_version proto = user_locking_protocol;
> + struct ocfs2_cluster_connection *conn;
>
> mlog(0, "mkdir %.*s\n", domain->len, domain->name);
>
> /* verify that we have a proper domain */
> - if (domain->len >= O2NM_MAX_NAME_LEN) {
> + if (domain->len >= GROUP_NAME_MAX) {
> status = -EINVAL;
> mlog(ML_ERROR, "invalid domain name for directory.\n");
> goto bail;
> @@ -520,14 +506,14 @@ static int dlmfs_mkdir(struct inode * dir,
>
> ip = DLMFS_I(inode);
>
> - dlm = user_dlm_register_context(domain, &proto);
> - if (IS_ERR(dlm)) {
> - status = PTR_ERR(dlm);
> + conn = user_dlm_register(domain);
> + if (IS_ERR(conn)) {
> + status = PTR_ERR(conn);
> mlog(ML_ERROR, "Error %d could not register domain \"%.*s\"\n",
> status, domain->len, domain->name);
> goto bail;
> }
> - ip->ip_dlm = dlm;
> + ip->ip_conn = conn;
>
> inc_nlink(dir);
> d_instantiate(dentry, inode);
> @@ -696,6 +682,7 @@ static int __init init_dlmfs_fs(void)
> }
> cleanup_worker = 1;
>
> + user_dlm_set_locking_protocol();
> status = register_filesystem(&dlmfs_fs_type);
> bail:
> if (status) {
> diff --git a/fs/ocfs2/dlmfs/userdlm.c b/fs/ocfs2/dlmfs/userdlm.c
> index 6adae70..c1b6a56 100644
> --- a/fs/ocfs2/dlmfs/userdlm.c
> +++ b/fs/ocfs2/dlmfs/userdlm.c
> @@ -34,18 +34,19 @@
> #include <linux/types.h>
> #include <linux/crc32.h>
>
> -
> -#include "cluster/nodemanager.h"
> -#include "cluster/heartbeat.h"
> -#include "cluster/tcp.h"
> -
> -#include "dlm/dlmapi.h"
> -
> +#include "ocfs2_lockingver.h"
> +#include "stackglue.h"
> #include "userdlm.h"
>
> #define MLOG_MASK_PREFIX ML_DLMFS
> #include "cluster/masklog.h"
>
> +
> +static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
> +{
> + return container_of(lksb, struct user_lock_res, l_lksb);
> +}
> +
> static inline int user_check_wait_flag(struct user_lock_res *lockres,
> int flag)
> {
> @@ -73,15 +74,15 @@ static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres)
> }
>
> /* I heart container_of... */
> -static inline struct dlm_ctxt *
> -dlm_ctxt_from_user_lockres(struct user_lock_res *lockres)
> +static inline struct ocfs2_cluster_connection *
> +cluster_connection_from_user_lockres(struct user_lock_res *lockres)
> {
> struct dlmfs_inode_private *ip;
>
> ip = container_of(lockres,
> struct dlmfs_inode_private,
> ip_lockres);
> - return ip->ip_dlm;
> + return ip->ip_conn;
> }
>
> static struct inode *
> @@ -103,9 +104,9 @@ static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
> }
>
> #define user_log_dlm_error(_func, _stat, _lockres) do { \
> - mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \
> - "resource %.*s: %s\n", dlm_errname(_stat), _func, \
> - _lockres->l_namelen, _lockres->l_name, dlm_errmsg(_stat)); \
> + mlog(ML_ERROR, "Dlm error %d while calling %s on " \
> + "resource %.*s\n", _stat, _func, \
> + _lockres->l_namelen, _lockres->l_name); \
> } while (0)
>
> /* WARNING: This function lives in a world where the only three lock
> @@ -113,34 +114,34 @@ static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
> * lock types are added. */
> static inline int user_highest_compat_lock_level(int level)
> {
> - int new_level = LKM_EXMODE;
> + int new_level = DLM_LOCK_EX;
>
> - if (level == LKM_EXMODE)
> - new_level = LKM_NLMODE;
> - else if (level == LKM_PRMODE)
> - new_level = LKM_PRMODE;
> + if (level == DLM_LOCK_EX)
> + new_level = DLM_LOCK_NL;
> + else if (level == DLM_LOCK_PR)
> + new_level = DLM_LOCK_PR;
> return new_level;
> }
>
> -static void user_ast(void *opaque)
> +static void user_ast(struct ocfs2_dlm_lksb *lksb)
> {
> - struct user_lock_res *lockres = opaque;
> - struct dlm_lockstatus *lksb;
> + struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
> + int status;
>
> mlog(0, "AST fired for lockres %.*s\n", lockres->l_namelen,
> lockres->l_name);
>
> spin_lock(&lockres->l_lock);
>
> - lksb = &(lockres->l_lksb);
> - if (lksb->status != DLM_NORMAL) {
> + status = ocfs2_dlm_lock_status(&lockres->l_lksb);
> + if (status) {
> mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
> - lksb->status, lockres->l_namelen, lockres->l_name);
> + status, lockres->l_namelen, lockres->l_name);
> spin_unlock(&lockres->l_lock);
> return;
> }
>
> - mlog_bug_on_msg(lockres->l_requested == LKM_IVMODE,
> + mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV,
> "Lockres %.*s, requested ivmode. flags 0x%x\n",
> lockres->l_namelen, lockres->l_name, lockres->l_flags);
>
> @@ -148,13 +149,13 @@ static void user_ast(void *opaque)
> if (lockres->l_requested < lockres->l_level) {
> if (lockres->l_requested <=
> user_highest_compat_lock_level(lockres->l_blocking)) {
> - lockres->l_blocking = LKM_NLMODE;
> + lockres->l_blocking = DLM_LOCK_NL;
> lockres->l_flags &= ~USER_LOCK_BLOCKED;
> }
> }
>
> lockres->l_level = lockres->l_requested;
> - lockres->l_requested = LKM_IVMODE;
> + lockres->l_requested = DLM_LOCK_IV;
> lockres->l_flags |= USER_LOCK_ATTACHED;
> lockres->l_flags &= ~USER_LOCK_BUSY;
>
> @@ -193,11 +194,11 @@ static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
> return;
>
> switch (lockres->l_blocking) {
> - case LKM_EXMODE:
> + case DLM_LOCK_EX:
> if (!lockres->l_ex_holders && !lockres->l_ro_holders)
> queue = 1;
> break;
> - case LKM_PRMODE:
> + case DLM_LOCK_PR:
> if (!lockres->l_ex_holders)
> queue = 1;
> break;
> @@ -209,9 +210,9 @@ static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
> __user_dlm_queue_lockres(lockres);
> }
>
> -static void user_bast(void *opaque, int level)
> +static void user_bast(struct ocfs2_dlm_lksb *lksb, int level)
> {
> - struct user_lock_res *lockres = opaque;
> + struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
>
> mlog(0, "Blocking AST fired for lockres %.*s. Blocking level %d\n",
> lockres->l_namelen, lockres->l_name, level);
> @@ -227,15 +228,15 @@ static void user_bast(void *opaque, int level)
> wake_up(&lockres->l_event);
> }
>
> -static void user_unlock_ast(void *opaque, enum dlm_status status)
> +static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status)
> {
> - struct user_lock_res *lockres = opaque;
> + struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
>
> mlog(0, "UNLOCK AST called on lock %.*s\n", lockres->l_namelen,
> lockres->l_name);
>
> - if (status != DLM_NORMAL && status != DLM_CANCELGRANT)
> - mlog(ML_ERROR, "Dlm returns status %d\n", status);
> + if (status)
> + mlog(ML_ERROR, "dlm returns status %d\n", status);
>
> spin_lock(&lockres->l_lock);
> /* The teardown flag gets set early during the unlock process,
> @@ -243,7 +244,7 @@ static void user_unlock_ast(void *opaque, enum dlm_status status)
> * for a concurrent cancel. */
> if (lockres->l_flags & USER_LOCK_IN_TEARDOWN
> && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
> - lockres->l_level = LKM_IVMODE;
> + lockres->l_level = DLM_LOCK_IV;
> } else if (status == DLM_CANCELGRANT) {
> /* We tried to cancel a convert request, but it was
> * already granted. Don't clear the busy flag - the
> @@ -254,7 +255,7 @@ static void user_unlock_ast(void *opaque, enum dlm_status status)
> } else {
> BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
> /* Cancel succeeded, we want to re-queue */
> - lockres->l_requested = LKM_IVMODE; /* cancel an
> + lockres->l_requested = DLM_LOCK_IV; /* cancel an
> * upconvert
> * request. */
> lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
> @@ -271,6 +272,21 @@ out_noclear:
> wake_up(&lockres->l_event);
> }
>
> +/*
> + * This is the userdlmfs locking protocol version.
> + *
> + * See fs/ocfs2/dlmglue.c for more details on locking versions.
> + */
> +static struct ocfs2_locking_protocol user_dlm_lproto = {
> + .lp_max_version = {
> + .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
> + .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
> + },
> + .lp_lock_ast = user_ast,
> + .lp_blocking_ast = user_bast,
> + .lp_unlock_ast = user_unlock_ast,
> +};
> +
> static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
> {
> struct inode *inode;
> @@ -283,7 +299,8 @@ static void user_dlm_unblock_lock(struct work_struct *work)
> int new_level, status;
> struct user_lock_res *lockres =
> container_of(work, struct user_lock_res, l_work);
> - struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
> + struct ocfs2_cluster_connection *conn =
> + cluster_connection_from_user_lockres(lockres);
>
> mlog(0, "processing lockres %.*s\n", lockres->l_namelen,
> lockres->l_name);
> @@ -322,20 +339,17 @@ static void user_dlm_unblock_lock(struct work_struct *work)
> lockres->l_flags |= USER_LOCK_IN_CANCEL;
> spin_unlock(&lockres->l_lock);
>
> - status = dlmunlock(dlm,
> - &lockres->l_lksb,
> - LKM_CANCEL,
> - user_unlock_ast,
> - lockres);
> - if (status != DLM_NORMAL)
> - user_log_dlm_error("dlmunlock", status, lockres);
> + status = ocfs2_dlm_unlock(conn, &lockres->l_lksb,
> + DLM_LKF_CANCEL);
> + if (status)
> + user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
> goto drop_ref;
> }
>
> /* If there are still incompat holders, we can exit safely
> * without worrying about re-queueing this lock as that will
> * happen on the last call to user_cluster_unlock. */
> - if ((lockres->l_blocking == LKM_EXMODE)
> + if ((lockres->l_blocking == DLM_LOCK_EX)
> && (lockres->l_ex_holders || lockres->l_ro_holders)) {
> spin_unlock(&lockres->l_lock);
> mlog(0, "can't downconvert for ex: ro = %u, ex = %u\n",
> @@ -343,7 +357,7 @@ static void user_dlm_unblock_lock(struct work_struct *work)
> goto drop_ref;
> }
>
> - if ((lockres->l_blocking == LKM_PRMODE)
> + if ((lockres->l_blocking == DLM_LOCK_PR)
> && lockres->l_ex_holders) {
> spin_unlock(&lockres->l_lock);
> mlog(0, "can't downconvert for pr: ex = %u\n",
> @@ -360,17 +374,12 @@ static void user_dlm_unblock_lock(struct work_struct *work)
> spin_unlock(&lockres->l_lock);
>
> /* need lock downconvert request now... */
> - status = dlmlock(dlm,
> - new_level,
> - &lockres->l_lksb,
> - LKM_CONVERT|LKM_VALBLK,
> - lockres->l_name,
> - lockres->l_namelen,
> - user_ast,
> - lockres,
> - user_bast);
> - if (status != DLM_NORMAL) {
> - user_log_dlm_error("dlmlock", status, lockres);
> + status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb,
> + DLM_LKF_CONVERT|DLM_LKF_VALBLK,
> + lockres->l_name,
> + lockres->l_namelen);
> + if (status) {
> + user_log_dlm_error("ocfs2_dlm_lock", status, lockres);
> user_recover_from_dlm_error(lockres);
> }
>
> @@ -382,10 +391,10 @@ static inline void user_dlm_inc_holders(struct user_lock_res *lockres,
> int level)
> {
> switch(level) {
> - case LKM_EXMODE:
> + case DLM_LOCK_EX:
> lockres->l_ex_holders++;
> break;
> - case LKM_PRMODE:
> + case DLM_LOCK_PR:
> lockres->l_ro_holders++;
> break;
> default:
> @@ -410,10 +419,11 @@ int user_dlm_cluster_lock(struct user_lock_res *lockres,
> int lkm_flags)
> {
> int status, local_flags;
> - struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
> + struct ocfs2_cluster_connection *conn =
> + cluster_connection_from_user_lockres(lockres);
>
> - if (level != LKM_EXMODE &&
> - level != LKM_PRMODE) {
> + if (level != DLM_LOCK_EX &&
> + level != DLM_LOCK_PR) {
> mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
> lockres->l_namelen, lockres->l_name);
> status = -EINVAL;
> @@ -422,7 +432,7 @@ int user_dlm_cluster_lock(struct user_lock_res *lockres,
>
> mlog(0, "lockres %.*s: asking for %s lock, passed flags = 0x%x\n",
> lockres->l_namelen, lockres->l_name,
> - (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE",
> + (level == DLM_LOCK_EX) ? "DLM_LOCK_EX" : "DLM_LOCK_PR",
> lkm_flags);
>
> again:
> @@ -457,35 +467,26 @@ again:
> }
>
> if (level > lockres->l_level) {
> - local_flags = lkm_flags | LKM_VALBLK;
> - if (lockres->l_level != LKM_IVMODE)
> - local_flags |= LKM_CONVERT;
> + local_flags = lkm_flags | DLM_LKF_VALBLK;
> + if (lockres->l_level != DLM_LOCK_IV)
> + local_flags |= DLM_LKF_CONVERT;
>
> lockres->l_requested = level;
> lockres->l_flags |= USER_LOCK_BUSY;
> spin_unlock(&lockres->l_lock);
>
> - BUG_ON(level == LKM_IVMODE);
> - BUG_ON(level == LKM_NLMODE);
> + BUG_ON(level == DLM_LOCK_IV);
> + BUG_ON(level == DLM_LOCK_NL);
>
> /* call dlm_lock to upgrade lock now */
> - status = dlmlock(dlm,
> - level,
> - &lockres->l_lksb,
> - local_flags,
> - lockres->l_name,
> - lockres->l_namelen,
> - user_ast,
> - lockres,
> - user_bast);
> - if (status != DLM_NORMAL) {
> - if ((lkm_flags & LKM_NOQUEUE) &&
> - (status == DLM_NOTQUEUED))
> - status = -EAGAIN;
> - else {
> - user_log_dlm_error("dlmlock", status, lockres);
> - status = -EINVAL;
> - }
> + status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb,
> + local_flags, lockres->l_name,
> + lockres->l_namelen);
> + if (status) {
> + if ((lkm_flags & DLM_LKF_NOQUEUE) &&
> + (status != -EAGAIN))
> + user_log_dlm_error("ocfs2_dlm_lock",
> + status, lockres);
> user_recover_from_dlm_error(lockres);
> goto bail;
> }
> @@ -506,11 +507,11 @@ static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
> int level)
> {
> switch(level) {
> - case LKM_EXMODE:
> + case DLM_LOCK_EX:
> BUG_ON(!lockres->l_ex_holders);
> lockres->l_ex_holders--;
> break;
> - case LKM_PRMODE:
> + case DLM_LOCK_PR:
> BUG_ON(!lockres->l_ro_holders);
> lockres->l_ro_holders--;
> break;
> @@ -522,8 +523,8 @@ static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
> void user_dlm_cluster_unlock(struct user_lock_res *lockres,
> int level)
> {
> - if (level != LKM_EXMODE &&
> - level != LKM_PRMODE) {
> + if (level != DLM_LOCK_EX &&
> + level != DLM_LOCK_PR) {
> mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
> lockres->l_namelen, lockres->l_name);
> return;
> @@ -540,33 +541,40 @@ void user_dlm_write_lvb(struct inode *inode,
> unsigned int len)
> {
> struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
> - char *lvb = lockres->l_lksb.lvb;
> + char *lvb;
>
> BUG_ON(len > DLM_LVB_LEN);
>
> spin_lock(&lockres->l_lock);
>
> - BUG_ON(lockres->l_level < LKM_EXMODE);
> + BUG_ON(lockres->l_level < DLM_LOCK_EX);
> + lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
> memcpy(lvb, val, len);
>
> spin_unlock(&lockres->l_lock);
> }
>
> -void user_dlm_read_lvb(struct inode *inode,
> - char *val,
> - unsigned int len)
> +ssize_t user_dlm_read_lvb(struct inode *inode,
> + char *val,
> + unsigned int len)
> {
> struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
> - char *lvb = lockres->l_lksb.lvb;
> + char *lvb;
> + ssize_t ret = len;
>
> BUG_ON(len > DLM_LVB_LEN);
>
> spin_lock(&lockres->l_lock);
>
> - BUG_ON(lockres->l_level < LKM_PRMODE);
> - memcpy(val, lvb, len);
> + BUG_ON(lockres->l_level < DLM_LOCK_PR);
> + if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) {
> + lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
> + memcpy(val, lvb, len);
> + } else
> + ret = 0;
>
> spin_unlock(&lockres->l_lock);
> + return ret;
> }
>
> void user_dlm_lock_res_init(struct user_lock_res *lockres,
> @@ -576,9 +584,9 @@ void user_dlm_lock_res_init(struct user_lock_res *lockres,
>
> spin_lock_init(&lockres->l_lock);
> init_waitqueue_head(&lockres->l_event);
> - lockres->l_level = LKM_IVMODE;
> - lockres->l_requested = LKM_IVMODE;
> - lockres->l_blocking = LKM_IVMODE;
> + lockres->l_level = DLM_LOCK_IV;
> + lockres->l_requested = DLM_LOCK_IV;
> + lockres->l_blocking = DLM_LOCK_IV;
>
> /* should have been checked before getting here. */
> BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN);
> @@ -592,7 +600,8 @@ void user_dlm_lock_res_init(struct user_lock_res *lockres,
> int user_dlm_destroy_lock(struct user_lock_res *lockres)
> {
> int status = -EBUSY;
> - struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
> + struct ocfs2_cluster_connection *conn =
> + cluster_connection_from_user_lockres(lockres);
>
> mlog(0, "asked to destroy %.*s\n", lockres->l_namelen, lockres->l_name);
>
> @@ -627,14 +636,9 @@ int user_dlm_destroy_lock(struct user_lock_res *lockres)
> lockres->l_flags |= USER_LOCK_BUSY;
> spin_unlock(&lockres->l_lock);
>
> - status = dlmunlock(dlm,
> - &lockres->l_lksb,
> - LKM_VALBLK,
> - user_unlock_ast,
> - lockres);
> - if (status != DLM_NORMAL) {
> - user_log_dlm_error("dlmunlock", status, lockres);
> - status = -EINVAL;
> + status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK);
> + if (status) {
> + user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
> goto bail;
> }
>
> @@ -645,32 +649,34 @@ bail:
> return status;
> }
>
> -struct dlm_ctxt *user_dlm_register_context(struct qstr *name,
> - struct dlm_protocol_version *proto)
> +static void user_dlm_recovery_handler_noop(int node_num,
> + void *recovery_data)
> {
> - struct dlm_ctxt *dlm;
> - u32 dlm_key;
> - char *domain;
> -
> - domain = kmalloc(name->len + 1, GFP_NOFS);
> - if (!domain) {
> - mlog_errno(-ENOMEM);
> - return ERR_PTR(-ENOMEM);
> - }
> + /* We ignore recovery events */
> + return;
> +}
>
> - dlm_key = crc32_le(0, name->name, name->len);
> +void user_dlm_set_locking_protocol(void)
> +{
> + ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version);
> +}
>
> - snprintf(domain, name->len + 1, "%.*s", name->len, name->name);
> +struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name)
> +{
> + int rc;
> + struct ocfs2_cluster_connection *conn;
>
> - dlm = dlm_register_domain(domain, dlm_key, proto);
> - if (IS_ERR(dlm))
> - mlog_errno(PTR_ERR(dlm));
> + rc = ocfs2_cluster_connect("o2cb", name->name, name->len,
> + &user_dlm_lproto,
> + user_dlm_recovery_handler_noop,
> + NULL, &conn);
> + if (rc)
> + mlog_errno(rc);
>
> - kfree(domain);
> - return dlm;
> + return rc ? ERR_PTR(rc) : conn;
> }
>
> -void user_dlm_unregister_context(struct dlm_ctxt *dlm)
> +void user_dlm_unregister(struct ocfs2_cluster_connection *conn)
> {
> - dlm_unregister_domain(dlm);
> + ocfs2_cluster_disconnect(conn, 0);
> }
> diff --git a/fs/ocfs2/dlmfs/userdlm.h b/fs/ocfs2/dlmfs/userdlm.h
> index 0c3cc03..3b42d79 100644
> --- a/fs/ocfs2/dlmfs/userdlm.h
> +++ b/fs/ocfs2/dlmfs/userdlm.h
> @@ -57,7 +57,7 @@ struct user_lock_res {
> int l_level;
> unsigned int l_ro_holders;
> unsigned int l_ex_holders;
> - struct dlm_lockstatus l_lksb;
> + struct ocfs2_dlm_lksb l_lksb;
>
> int l_requested;
> int l_blocking;
> @@ -80,15 +80,15 @@ void user_dlm_cluster_unlock(struct user_lock_res *lockres,
> void user_dlm_write_lvb(struct inode *inode,
> const char *val,
> unsigned int len);
> -void user_dlm_read_lvb(struct inode *inode,
> - char *val,
> - unsigned int len);
> -struct dlm_ctxt *user_dlm_register_context(struct qstr *name,
> - struct dlm_protocol_version *proto);
> -void user_dlm_unregister_context(struct dlm_ctxt *dlm);
> +ssize_t user_dlm_read_lvb(struct inode *inode,
> + char *val,
> + unsigned int len);
> +struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name);
> +void user_dlm_unregister(struct ocfs2_cluster_connection *conn);
> +void user_dlm_set_locking_protocol(void);
>
> struct dlmfs_inode_private {
> - struct dlm_ctxt *ip_dlm;
> + struct ocfs2_cluster_connection *ip_conn;
>
> struct user_lock_res ip_lockres; /* unused for directories. */
> struct inode *ip_parent;
>