2014-06-30 15:53:17

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 00/18] RFC DRBD fixes

Hi,

this series has a number of fixes to DRBD in random places. The most
interesting change is "get rid of atomic update on disk bitmap works".

These patches are targeted at the 3.17 merge window, after review.

There will be one more post containing "random stuff", and a final
one that adds debug-fs support to DRBD.

best,
Phil

Lars Ellenberg (15):
drbd: reduce number of spinlock drop/re-aquire cycles
drbd: refactor use of first_peer_device()
drbd: allow write-ordering policy to be bumped up again
drbd: get rid of atomic update on disk bitmap works
drbd: fix a race stopping the worker thread
drbd: fix resync finished detection
drbd: stop the meta data sync timer before open coded meta data sync
drbd: re-add lost conf_mutex protection in drbd_set_role
drbd: trigger tcp_push_pending_frames() for PING and PING_ACK
drbd: move set_disk_ro() to after we persisted the new role
drbd: explicitly submit meta data requests with REQ_NOIDLE
drbd: close race when detaching from disk
drbd: make sure disk cleanup happens in worker context
drbd: use drbd_device_post_work() in more places
drbd: get rid of drbd_queue_work_front

Philipp Reisner (3):
drbd: Move write_ordering from connection to resource
drbd: device->ldev is not guaranteed on an D_ATTACHING disk
drbd: rename drbd_free_bc() to drbd_free_ldev()

drivers/block/drbd/drbd_actlog.c | 438 ++++++++++++-------------------------
drivers/block/drbd/drbd_bitmap.c | 57 +----
drivers/block/drbd/drbd_int.h | 123 ++++++-----
drivers/block/drbd/drbd_main.c | 133 ++---------
drivers/block/drbd/drbd_nl.c | 100 +++++----
drivers/block/drbd/drbd_proc.c | 2 +-
drivers/block/drbd/drbd_receiver.c | 86 +++++---
drivers/block/drbd/drbd_req.c | 47 ++--
drivers/block/drbd/drbd_state.c | 79 ++++---
drivers/block/drbd/drbd_worker.c | 231 +++++++++++++++----
include/linux/drbd.h | 2 +-
11 files changed, 615 insertions(+), 683 deletions(-)

--
1.9.1


2014-06-30 15:53:15

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 04/18] drbd: reduce number of spinlock drop/re-aquire cycles

From: Lars Ellenberg <[email protected]>

Instead of dropping and re-aquiring the spinlock around the submit,
just remember that we want to submit, and do that only once we have
dropped the spinlock for good.

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_req.c | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index 09803d0..4c7fee1 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -1086,11 +1086,13 @@ drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long

static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request *req)
{
+ struct drbd_resource *resource = device->resource;
const int rw = bio_rw(req->master_bio);
struct bio_and_error m = { NULL, };
bool no_remote = false;
+ bool submit_private_bio = false;

- spin_lock_irq(&device->resource->req_lock);
+ spin_lock_irq(&resource->req_lock);
if (rw == WRITE) {
/* This may temporarily give up the req_lock,
* but will re-aquire it before it returns here.
@@ -1152,9 +1154,7 @@ static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request
/* needs to be marked within the same spinlock */
_req_mod(req, TO_BE_SUBMITTED);
/* but we need to give up the spinlock to submit */
- spin_unlock_irq(&device->resource->req_lock);
- drbd_submit_req_private_bio(req);
- spin_lock_irq(&device->resource->req_lock);
+ submit_private_bio = true;
} else if (no_remote) {
nodata:
if (__ratelimit(&drbd_ratelimit_state))
@@ -1167,8 +1167,16 @@ nodata:
out:
if (drbd_req_put_completion_ref(req, &m, 1))
kref_put(&req->kref, drbd_req_destroy);
- spin_unlock_irq(&device->resource->req_lock);
-
+ spin_unlock_irq(&resource->req_lock);
+
+ /* Even though above is a kref_put(), this is safe.
+ * As long as we still need to submit our private bio,
+ * we hold a completion ref, and the request cannot disappear.
+ * If however this request did not even have a private bio to submit
+ * (e.g. remote read), req may already be invalid now.
+ * That's why we cannot check on req->private_bio. */
+ if (submit_private_bio)
+ drbd_submit_req_private_bio(req);
if (m.bio)
complete_master_bio(device, &m);
}
--
1.9.1

2014-06-30 15:54:06

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 15/18] drbd: close race when detaching from disk

From: Lars Ellenberg <[email protected]>

BUG: unable to handle kernel NULL pointer dereference at 0000000000000058
IP: bd_release+0x21/0x70
Process drbd_w_t7146
Call Trace:
close_bdev_exclusive
drbd_free_ldev [drbd]
drbd_ldev_destroy [drbd]
w_after_state_ch [drbd]

Race probably went like this:
state.disk = D_FAILED

... first one to hit zero during D_FAILED:
put_ldev() /* ----------------> 0 */
i = atomic_dec_return()
if (i == 0)
if (state.disk == D_FAILED)
schedule_work(go_diskless)
/* 1 <------ */ get_ldev_if_state()
go_diskless()
do_some_pre_cleanup() corresponding put_ldev():
force_state(D_DISKLESS) /* 0 <------ */ i = atomic_dec_return()
if (i == 0)
atomic_inc() /* ---------> 1 */
state.disk = D_DISKLESS
schedule_work(after_state_ch) /* execution pre-empted by IRQ ? */

after_state_ch()
put_ldev()
i = atomic_dec_return() /* 0 */
if (i == 0)
if (state.disk == D_DISKLESS) if (state.disk == D_DISKLESS)
drbd_ldev_destroy() drbd_ldev_destroy();

Trying to fix this by checking the disk state *before* the
atomic_dec_return(), which implies memory barriers, and by inserting
extra memory barriers around the state assignment in __drbd_set_state().

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_int.h | 9 +++++++--
drivers/block/drbd/drbd_state.c | 11 +++++++----
2 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index a16f9ae..a0ffc19 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -1946,6 +1946,11 @@ static inline bool is_sync_state(enum drbd_conns connection_state)

static inline void put_ldev(struct drbd_device *device)
{
+ enum drbd_disk_state ds = device->state.disk;
+ /* We must check the state *before* the atomic_dec becomes visible,
+ * or we have a theoretical race where someone hitting zero,
+ * while state still D_FAILED, will then see D_DISKLESS in the
+ * condition below and calling into destroy, where he must not, yet. */
int i = atomic_dec_return(&device->local_cnt);

/* This may be called from some endio handler,
@@ -1954,10 +1959,10 @@ static inline void put_ldev(struct drbd_device *device)
__release(local);
D_ASSERT(device, i >= 0);
if (i == 0) {
- if (device->state.disk == D_DISKLESS)
+ if (ds == D_DISKLESS)
/* even internal references gone, safe to destroy */
drbd_ldev_destroy(device);
- if (device->state.disk == D_FAILED) {
+ if (ds == D_FAILED) {
/* all application IO references gone. */
if (!test_and_set_bit(GO_DISKLESS, &device->flags))
drbd_queue_work(&first_peer_device(device)->connection->sender_work,
diff --git a/drivers/block/drbd/drbd_state.c b/drivers/block/drbd/drbd_state.c
index 1bddd6c..6629f46 100644
--- a/drivers/block/drbd/drbd_state.c
+++ b/drivers/block/drbd/drbd_state.c
@@ -958,7 +958,6 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
enum drbd_state_rv rv = SS_SUCCESS;
enum sanitize_state_warnings ssw;
struct after_state_chg_work *ascw;
- bool did_remote, should_do_remote;

os = drbd_read_state(device);

@@ -1010,18 +1009,22 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
(os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
atomic_inc(&device->local_cnt);

- did_remote = drbd_should_do_remote(device->state);
if (!is_sync_state(os.conn) && is_sync_state(ns.conn))
clear_bit(RS_DONE, &device->flags);

+ /* changes to local_cnt and device flags should be visible before
+ * changes to state, which again should be visible before anything else
+ * depending on that change happens. */
+ smp_wmb();
device->state.i = ns.i;
- should_do_remote = drbd_should_do_remote(device->state);
device->resource->susp = ns.susp;
device->resource->susp_nod = ns.susp_nod;
device->resource->susp_fen = ns.susp_fen;
+ smp_wmb();

/* put replicated vs not-replicated requests in seperate epochs */
- if (did_remote != should_do_remote)
+ if (drbd_should_do_remote((union drbd_dev_state)os.i) !=
+ drbd_should_do_remote((union drbd_dev_state)ns.i))
start_new_tl_epoch(connection);

if (os.disk == D_ATTACHING && ns.disk >= D_NEGOTIATING)
--
1.9.1

2014-06-30 15:54:05

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 17/18] drbd: use drbd_device_post_work() in more places

From: Lars Ellenberg <[email protected]>

This replaces the md_sync_work member of struct drbd_device
by a new MD_SYNC "work bit" in device->flags.

This replaces the resync_start_work member of struct drbd_device
by a new RS_START "work bit" in device->flags.

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_int.h | 16 ++--------------
drivers/block/drbd/drbd_main.c | 25 +------------------------
drivers/block/drbd/drbd_worker.c | 27 +++++++++++++++++----------
3 files changed, 20 insertions(+), 48 deletions(-)

diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index 5768260..3c701b0 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -457,6 +457,8 @@ enum {
/* to be used in drbd_device_post_work() */
GO_DISKLESS, /* tell worker to schedule cleanup before detach */
DESTROY_DISK, /* tell worker to close backing devices and destroy related structures. */
+ MD_SYNC, /* tell worker to call drbd_md_sync() */
+ RS_START, /* tell worker to start resync/OV */
RS_PROGRESS, /* tell worker that resync made significant progress */
RS_DONE, /* tell worker that resync is done */
};
@@ -709,18 +711,10 @@ struct drbd_device {
unsigned long last_reattach_jif;
struct drbd_work resync_work;
struct drbd_work unplug_work;
- struct drbd_work md_sync_work;
- struct drbd_work start_resync_work;
struct timer_list resync_timer;
struct timer_list md_sync_timer;
struct timer_list start_resync_timer;
struct timer_list request_timer;
-#ifdef DRBD_DEBUG_MD_SYNC
- struct {
- unsigned int line;
- const char* func;
- } last_md_mark_dirty;
-#endif

/* Used after attach while negotiating new disk state. */
union drbd_state new_state_tmp;
@@ -977,13 +971,7 @@ extern void __drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must
extern void drbd_md_set_flag(struct drbd_device *device, int flags) __must_hold(local);
extern void drbd_md_clear_flag(struct drbd_device *device, int flags)__must_hold(local);
extern int drbd_md_test_flag(struct drbd_backing_dev *, int);
-#ifndef DRBD_DEBUG_MD_SYNC
extern void drbd_md_mark_dirty(struct drbd_device *device);
-#else
-#define drbd_md_mark_dirty(m) drbd_md_mark_dirty_(m, __LINE__ , __func__ )
-extern void drbd_md_mark_dirty_(struct drbd_device *device,
- unsigned int line, const char *func);
-#endif
extern void drbd_queue_bitmap_io(struct drbd_device *device,
int (*io_fn)(struct drbd_device *),
void (*done)(struct drbd_device *, int),
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 0cf6094..ed35d52 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -60,7 +60,6 @@
static DEFINE_MUTEX(drbd_main_mutex);
static int drbd_open(struct block_device *bdev, fmode_t mode);
static void drbd_release(struct gendisk *gd, fmode_t mode);
-static int w_md_sync(struct drbd_work *w, int unused);
static void md_sync_timer_fn(unsigned long data);
static int w_bitmap_io(struct drbd_work *w, int unused);

@@ -1928,15 +1927,11 @@ void drbd_init_set_defaults(struct drbd_device *device)
INIT_LIST_HEAD(&device->resync_reads);
INIT_LIST_HEAD(&device->resync_work.list);
INIT_LIST_HEAD(&device->unplug_work.list);
- INIT_LIST_HEAD(&device->md_sync_work.list);
- INIT_LIST_HEAD(&device->start_resync_work.list);
INIT_LIST_HEAD(&device->bm_io_work.w.list);

device->resync_work.cb = w_resync_timer;
device->unplug_work.cb = w_send_write_hint;
- device->md_sync_work.cb = w_md_sync;
device->bm_io_work.w.cb = w_bitmap_io;
- device->start_resync_work.cb = w_start_resync;

init_timer(&device->resync_timer);
init_timer(&device->md_sync_timer);
@@ -3623,25 +3618,7 @@ int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
static void md_sync_timer_fn(unsigned long data)
{
struct drbd_device *device = (struct drbd_device *) data;
-
- /* must not double-queue! */
- if (list_empty(&device->md_sync_work.list))
- drbd_queue_work_front(&first_peer_device(device)->connection->sender_work,
- &device->md_sync_work);
-}
-
-static int w_md_sync(struct drbd_work *w, int unused)
-{
- struct drbd_device *device =
- container_of(w, struct drbd_device, md_sync_work);
-
- drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
-#ifdef DEBUG
- drbd_warn(device, "last md_mark_dirty: %s:%u\n",
- device->last_md_mark_dirty.func, device->last_md_mark_dirty.line);
-#endif
- drbd_md_sync(device);
- return 0;
+ drbd_device_post_work(device, MD_SYNC);
}

const char *cmdname(enum drbd_packet cmd)
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 00bf490..a4310fd 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1606,26 +1606,20 @@ void drbd_rs_controller_reset(struct drbd_device *device)
void start_resync_timer_fn(unsigned long data)
{
struct drbd_device *device = (struct drbd_device *) data;
-
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
- &device->start_resync_work);
+ drbd_device_post_work(device, RS_START);
}

-int w_start_resync(struct drbd_work *w, int cancel)
+static void do_start_resync(struct drbd_device *device)
{
- struct drbd_device *device =
- container_of(w, struct drbd_device, start_resync_work);
-
if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
- drbd_warn(device, "w_start_resync later...\n");
+ drbd_warn(device, "postponing start_resync ...\n");
device->start_resync_timer.expires = jiffies + HZ/10;
add_timer(&device->start_resync_timer);
- return 0;
+ return;
}

drbd_start_resync(device, C_SYNC_SOURCE);
clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
- return 0;
}

/**
@@ -1882,9 +1876,18 @@ static void go_diskless(struct drbd_device *device)
drbd_force_state(device, NS(disk, D_DISKLESS));
}

+static int do_md_sync(struct drbd_device *device)
+{
+ drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
+ drbd_md_sync(device);
+ return 0;
+}
+
#define WORK_PENDING(work_bit, todo) (todo & (1UL << work_bit))
static void do_device_work(struct drbd_device *device, const unsigned long todo)
{
+ if (WORK_PENDING(MD_SYNC, todo))
+ do_md_sync(device);
if (WORK_PENDING(RS_DONE, todo) ||
WORK_PENDING(RS_PROGRESS, todo))
update_on_disk_bitmap(device, WORK_PENDING(RS_DONE, todo));
@@ -1892,11 +1895,15 @@ static void do_device_work(struct drbd_device *device, const unsigned long todo)
go_diskless(device);
if (WORK_PENDING(DESTROY_DISK, todo))
drbd_ldev_destroy(device);
+ if (WORK_PENDING(RS_START, todo))
+ do_start_resync(device);
}

#define DRBD_DEVICE_WORK_MASK \
((1UL << GO_DISKLESS) \
|(1UL << DESTROY_DISK) \
+ |(1UL << MD_SYNC) \
+ |(1UL << RS_START) \
|(1UL << RS_PROGRESS) \
|(1UL << RS_DONE) \
)
--
1.9.1

2014-06-30 15:54:03

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 18/18] drbd: get rid of drbd_queue_work_front

From: Lars Ellenberg <[email protected]>

The last user was al_write_transaction, if called with "delegate",
and the last user to call it with "delegate = true" was the receiver
thread, which has no need to delegate, but can call it himself.

Finally drop the delegate parameter, drop the extra
w_al_write_transaction callback, and drop drbd_queue_work_front.

Do not (yet) change dequeue_work_item to dequeue_work_batch, though.

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_actlog.c | 69 ++++----------------------------------
drivers/block/drbd/drbd_int.h | 14 ++------
drivers/block/drbd/drbd_receiver.c | 2 +-
drivers/block/drbd/drbd_req.c | 2 +-
drivers/block/drbd/drbd_worker.c | 6 ++--
5 files changed, 12 insertions(+), 81 deletions(-)

diff --git a/drivers/block/drbd/drbd_actlog.c b/drivers/block/drbd/drbd_actlog.c
index 5add201..5f82a36 100644
--- a/drivers/block/drbd/drbd_actlog.c
+++ b/drivers/block/drbd/drbd_actlog.c
@@ -92,14 +92,6 @@ struct __packed al_transaction_on_disk {
__be32 context[AL_CONTEXT_PER_TRANSACTION];
};

-struct update_al_work {
- struct drbd_work w;
- struct drbd_device *device;
- struct completion event;
- int err;
-};
-
-
void *drbd_md_get_buffer(struct drbd_device *device)
{
int r;
@@ -291,26 +283,12 @@ bool drbd_al_begin_io_prepare(struct drbd_device *device, struct drbd_interval *
return need_transaction;
}

-static int al_write_transaction(struct drbd_device *device, bool delegate);
+static int al_write_transaction(struct drbd_device *device);

-/* When called through generic_make_request(), we must delegate
- * activity log I/O to the worker thread: a further request
- * submitted via generic_make_request() within the same task
- * would be queued on current->bio_list, and would only start
- * after this function returns (see generic_make_request()).
- *
- * However, if we *are* the worker, we must not delegate to ourselves.
- */
-
-/*
- * @delegate: delegate activity log I/O to the worker thread
- */
-void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate)
+void drbd_al_begin_io_commit(struct drbd_device *device)
{
bool locked = false;

- BUG_ON(delegate && current == first_peer_device(device)->connection->worker.task);
-
/* Serialize multiple transactions.
* This uses test_and_set_bit, memory barrier is implicit.
*/
@@ -329,7 +307,7 @@ void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate)
rcu_read_unlock();

if (write_al_updates)
- al_write_transaction(device, delegate);
+ al_write_transaction(device);
spin_lock_irq(&device->al_lock);
/* FIXME
if (err)
@@ -346,12 +324,10 @@ void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate)
/*
* @delegate: delegate activity log I/O to the worker thread
*/
-void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i, bool delegate)
+void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i)
{
- BUG_ON(delegate && current == first_peer_device(device)->connection->worker.task);
-
if (drbd_al_begin_io_prepare(device, i))
- drbd_al_begin_io_commit(device, delegate);
+ drbd_al_begin_io_commit(device);
}

int drbd_al_begin_io_nonblock(struct drbd_device *device, struct drbd_interval *i)
@@ -464,8 +440,7 @@ static sector_t al_tr_number_to_on_disk_sector(struct drbd_device *device)
return device->ldev->md.md_offset + device->ldev->md.al_offset + t;
}

-static int
-_al_write_transaction(struct drbd_device *device)
+int al_write_transaction(struct drbd_device *device)
{
struct al_transaction_on_disk *buffer;
struct lc_element *e;
@@ -575,38 +550,6 @@ _al_write_transaction(struct drbd_device *device)
return err;
}

-
-static int w_al_write_transaction(struct drbd_work *w, int unused)
-{
- struct update_al_work *aw = container_of(w, struct update_al_work, w);
- struct drbd_device *device = aw->device;
- int err;
-
- err = _al_write_transaction(device);
- aw->err = err;
- complete(&aw->event);
-
- return err != -EIO ? err : 0;
-}
-
-/* Calls from worker context (see w_restart_disk_io()) need to write the
- transaction directly. Others came through generic_make_request(),
- those need to delegate it to the worker. */
-static int al_write_transaction(struct drbd_device *device, bool delegate)
-{
- if (delegate) {
- struct update_al_work al_work;
- init_completion(&al_work.event);
- al_work.w.cb = w_al_write_transaction;
- al_work.device = device;
- drbd_queue_work_front(&first_peer_device(device)->connection->sender_work,
- &al_work.w);
- wait_for_completion(&al_work.event);
- return al_work.err;
- } else
- return _al_write_transaction(device);
-}
-
static int _try_lc_del(struct drbd_device *device, struct lc_element *al_ext)
{
int rv;
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index 3c701b0..72b5001 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -1491,9 +1491,9 @@ extern const char *drbd_role_str(enum drbd_role s);
/* drbd_actlog.c */
extern bool drbd_al_begin_io_prepare(struct drbd_device *device, struct drbd_interval *i);
extern int drbd_al_begin_io_nonblock(struct drbd_device *device, struct drbd_interval *i);
-extern void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate);
+extern void drbd_al_begin_io_commit(struct drbd_device *device);
extern bool drbd_al_begin_io_fastpath(struct drbd_device *device, struct drbd_interval *i);
-extern void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i, bool delegate);
+extern void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i);
extern void drbd_al_complete_io(struct drbd_device *device, struct drbd_interval *i);
extern void drbd_rs_complete_io(struct drbd_device *device, sector_t sector);
extern int drbd_rs_begin_io(struct drbd_device *device, sector_t sector);
@@ -1769,16 +1769,6 @@ static inline sector_t drbd_md_ss(struct drbd_backing_dev *bdev)
}

static inline void
-drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w)
-{
- unsigned long flags;
- spin_lock_irqsave(&q->q_lock, flags);
- list_add(&w->list, &q->q);
- spin_unlock_irqrestore(&q->q_lock, flags);
- wake_up(&q->q_wait);
-}
-
-static inline void
drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
{
unsigned long flags;
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index 8d8a6b7..cfc3d43 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -2351,7 +2351,7 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
- drbd_al_begin_io(device, &peer_req->i, true);
+ drbd_al_begin_io(device, &peer_req->i);
}

err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index 042bbc6..1ee7355 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -1286,7 +1286,7 @@ skip_fast_path:
if (!made_progress)
break;
}
- drbd_al_begin_io_commit(device, false);
+ drbd_al_begin_io_commit(device);

list_for_each_entry_safe(req, tmp, &pending, tl_requests) {
list_del_init(&req->tl_requests);
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index a4310fd..2ff5fd4 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1438,7 +1438,7 @@ int w_restart_disk_io(struct drbd_work *w, int cancel)
struct drbd_device *device = req->device;

if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
- drbd_al_begin_io(device, &req->i, false);
+ drbd_al_begin_io(device, &req->i);

drbd_req_make_private_bio(req, req->master_bio);
req->private_bio->bi_bdev = device->ldev->backing_bdev;
@@ -1991,7 +1991,7 @@ static void wait_for_work(struct drbd_connection *connection, struct list_head *
/* dequeue single item only,
* we still use drbd_queue_work_front() in some places */
if (!list_empty(&connection->sender_work.q))
- list_move(connection->sender_work.q.next, work_list);
+ list_splice_tail_init(&connection->sender_work.q, work_list);
spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
if (!list_empty(work_list) || signal_pending(current)) {
spin_unlock_irq(&connection->resource->req_lock);
@@ -2054,8 +2054,6 @@ int drbd_worker(struct drbd_thread *thi)
while (get_t_state(thi) == RUNNING) {
drbd_thread_current_set_cpu(thi);

- /* as long as we use drbd_queue_work_front(),
- * we may only dequeue single work items here, not batches. */
if (list_empty(&work_list))
wait_for_work(connection, &work_list);

--
1.9.1

2014-06-30 15:54:00

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 16/18] drbd: make sure disk cleanup happens in worker context

From: Lars Ellenberg <[email protected]>

The recent fix to put_ldev() (correct ordering of access to local_cnt
and state.disk; memory barrier in __drbd_set_state) guarantees
that the cleanup happens exactly once.

However it does not yet guarantee that the cleanup happens from worker
context, the last put_ldev() may still happen from atomic context,
which must not happen: blkdev_put() may sleep.

Fix this by scheduling the cleanup to the worker instead,
using a couple more bits in device->flags and a new helper,
drbd_device_post_work().

Generalized the "resync progress" work to cover these new work bits.

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_actlog.c | 9 +---
drivers/block/drbd/drbd_int.h | 40 +++++++++++-----
drivers/block/drbd/drbd_main.c | 59 -----------------------
drivers/block/drbd/drbd_nl.c | 2 +-
drivers/block/drbd/drbd_worker.c | 100 +++++++++++++++++++++++++++++++++++----
5 files changed, 120 insertions(+), 90 deletions(-)

diff --git a/drivers/block/drbd/drbd_actlog.c b/drivers/block/drbd/drbd_actlog.c
index 14b2f27..5add201 100644
--- a/drivers/block/drbd/drbd_actlog.c
+++ b/drivers/block/drbd/drbd_actlog.c
@@ -797,20 +797,13 @@ static bool lazy_bitmap_update_due(struct drbd_device *device)

static void maybe_schedule_on_disk_bitmap_update(struct drbd_device *device, bool rs_done)
{
- struct drbd_connection *connection;
if (rs_done)
set_bit(RS_DONE, &device->flags);
/* and also set RS_PROGRESS below */
else if (!lazy_bitmap_update_due(device))
return;

- /* compare with test_and_clear_bit() calls in and above
- * try_update_all_on_disk_bitmaps() from the drbd_worker(). */
- if (test_and_set_bit(RS_PROGRESS, &device->flags))
- return;
- connection = first_peer_device(device)->connection;
- if (!test_and_set_bit(CONN_RS_PROGRESS, &connection->flags))
- wake_up(&connection->sender_work.q_wait);
+ drbd_device_post_work(device, RS_PROGRESS);
}

static int update_sync_bits(struct drbd_device *device,
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index a0ffc19..5768260 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -432,16 +432,12 @@ enum {
* goes into C_CONNECTED state. */
CONSIDER_RESYNC,

- RS_PROGRESS, /* tell worker that resync made significant progress */
- RS_DONE, /* tell worker that resync is done */
-
MD_NO_FUA, /* Users wants us to not use FUA/FLUSH on meta data dev */

SUSPEND_IO, /* suspend application io */
BITMAP_IO, /* suspend application io;
once no more io in flight, start bitmap io */
BITMAP_IO_QUEUED, /* Started bitmap IO */
- GO_DISKLESS, /* Disk is being detached, on io-error or admin request. */
WAS_IO_ERROR, /* Local disk failed, returned IO error */
WAS_READ_ERROR, /* Local disk READ failed (set additionally to the above) */
FORCE_DETACH, /* Force-detach from local disk, aborting any pending local IO */
@@ -454,6 +450,15 @@ enum {
B_RS_H_DONE, /* Before resync handler done (already executed) */
DISCARD_MY_DATA, /* discard_my_data flag per volume */
READ_BALANCE_RR,
+
+ /* cleared only after backing device related structures have been destroyed. */
+ GOING_DISKLESS, /* Disk is being detached, because of io-error, or admin request. */
+
+ /* to be used in drbd_device_post_work() */
+ GO_DISKLESS, /* tell worker to schedule cleanup before detach */
+ DESTROY_DISK, /* tell worker to close backing devices and destroy related structures. */
+ RS_PROGRESS, /* tell worker that resync made significant progress */
+ RS_DONE, /* tell worker that resync is done */
};

struct drbd_bitmap; /* opaque for drbd_device */
@@ -581,7 +586,8 @@ enum {
* and potentially deadlock on, this drbd worker.
*/
DISCONNECT_SENT,
- CONN_RS_PROGRESS, /* tell worker that resync made significant progress */
+
+ DEVICE_WORK_PENDING, /* tell worker that some device has pending work */
};

struct drbd_resource {
@@ -703,7 +709,6 @@ struct drbd_device {
unsigned long last_reattach_jif;
struct drbd_work resync_work;
struct drbd_work unplug_work;
- struct drbd_work go_diskless;
struct drbd_work md_sync_work;
struct drbd_work start_resync_work;
struct timer_list resync_timer;
@@ -991,7 +996,6 @@ extern int drbd_bitmap_io_from_worker(struct drbd_device *device,
char *why, enum bm_flag flags);
extern int drbd_bmio_set_n_write(struct drbd_device *device) __must_hold(local);
extern int drbd_bmio_clear_n_write(struct drbd_device *device) __must_hold(local);
-extern void drbd_ldev_destroy(struct drbd_device *device);

/* Meta data layout
*
@@ -1796,6 +1800,18 @@ drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
wake_up(&q->q_wait);
}

+static inline void
+drbd_device_post_work(struct drbd_device *device, int work_bit)
+{
+ if (!test_and_set_bit(work_bit, &device->flags)) {
+ struct drbd_connection *connection =
+ first_peer_device(device)->connection;
+ struct drbd_work_queue *q = &connection->sender_work;
+ if (!test_and_set_bit(DEVICE_WORK_PENDING, &connection->flags))
+ wake_up(&q->q_wait);
+ }
+}
+
extern void drbd_flush_workqueue(struct drbd_work_queue *work_queue);

static inline void wake_asender(struct drbd_connection *connection)
@@ -1961,13 +1977,11 @@ static inline void put_ldev(struct drbd_device *device)
if (i == 0) {
if (ds == D_DISKLESS)
/* even internal references gone, safe to destroy */
- drbd_ldev_destroy(device);
- if (ds == D_FAILED) {
+ drbd_device_post_work(device, DESTROY_DISK);
+ if (ds == D_FAILED)
/* all application IO references gone. */
- if (!test_and_set_bit(GO_DISKLESS, &device->flags))
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
- &device->go_diskless);
- }
+ if (!test_and_set_bit(GOING_DISKLESS, &device->flags))
+ drbd_device_post_work(device, GO_DISKLESS);
wake_up(&device->misc_wait);
}
}
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 77d34a6..0cf6094 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -63,7 +63,6 @@ static void drbd_release(struct gendisk *gd, fmode_t mode);
static int w_md_sync(struct drbd_work *w, int unused);
static void md_sync_timer_fn(unsigned long data);
static int w_bitmap_io(struct drbd_work *w, int unused);
-static int w_go_diskless(struct drbd_work *w, int unused);

MODULE_AUTHOR("Philipp Reisner <[email protected]>, "
"Lars Ellenberg <[email protected]>");
@@ -1929,14 +1928,12 @@ void drbd_init_set_defaults(struct drbd_device *device)
INIT_LIST_HEAD(&device->resync_reads);
INIT_LIST_HEAD(&device->resync_work.list);
INIT_LIST_HEAD(&device->unplug_work.list);
- INIT_LIST_HEAD(&device->go_diskless.list);
INIT_LIST_HEAD(&device->md_sync_work.list);
INIT_LIST_HEAD(&device->start_resync_work.list);
INIT_LIST_HEAD(&device->bm_io_work.w.list);

device->resync_work.cb = w_resync_timer;
device->unplug_work.cb = w_send_write_hint;
- device->go_diskless.cb = w_go_diskless;
device->md_sync_work.cb = w_md_sync;
device->bm_io_work.w.cb = w_bitmap_io;
device->start_resync_work.cb = w_start_resync;
@@ -2011,7 +2008,6 @@ void drbd_device_cleanup(struct drbd_device *device)
D_ASSERT(device, list_empty(&first_peer_device(device)->connection->sender_work.q));
D_ASSERT(device, list_empty(&device->resync_work.list));
D_ASSERT(device, list_empty(&device->unplug_work.list));
- D_ASSERT(device, list_empty(&device->go_diskless.list));

drbd_set_defaults(device);
}
@@ -3531,61 +3527,6 @@ static int w_bitmap_io(struct drbd_work *w, int unused)
return 0;
}

-void drbd_ldev_destroy(struct drbd_device *device)
-{
- lc_destroy(device->resync);
- device->resync = NULL;
- lc_destroy(device->act_log);
- device->act_log = NULL;
- __no_warn(local,
- drbd_free_ldev(device->ldev);
- device->ldev = NULL;);
-
- clear_bit(GO_DISKLESS, &device->flags);
-}
-
-static int w_go_diskless(struct drbd_work *w, int unused)
-{
- struct drbd_device *device =
- container_of(w, struct drbd_device, go_diskless);
-
- D_ASSERT(device, device->state.disk == D_FAILED);
- /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
- * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
- * the protected members anymore, though, so once put_ldev reaches zero
- * again, it will be safe to free them. */
-
- /* Try to write changed bitmap pages, read errors may have just
- * set some bits outside the area covered by the activity log.
- *
- * If we have an IO error during the bitmap writeout,
- * we will want a full sync next time, just in case.
- * (Do we want a specific meta data flag for this?)
- *
- * If that does not make it to stable storage either,
- * we cannot do anything about that anymore.
- *
- * We still need to check if both bitmap and ldev are present, we may
- * end up here after a failed attach, before ldev was even assigned.
- */
- if (device->bitmap && device->ldev) {
- /* An interrupted resync or similar is allowed to recounts bits
- * while we detach.
- * Any modifications would not be expected anymore, though.
- */
- if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
- "detach", BM_LOCKED_TEST_ALLOWED)) {
- if (test_bit(WAS_READ_ERROR, &device->flags)) {
- drbd_md_set_flag(device, MDF_FULL_SYNC);
- drbd_md_sync(device);
- }
- }
- }
-
- drbd_force_state(device, NS(disk, D_DISKLESS));
- return 0;
-}
-
/**
* drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
* @device: DRBD device.
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index ceebd31..628167d 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -1482,7 +1482,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
* drbd_ldev_destroy is done already, we may end up here very fast,
* e.g. if someone calls attach from the on-io-error handler,
* to realize a "hot spare" feature (not that I'd recommend that) */
- wait_event(device->misc_wait, !atomic_read(&device->local_cnt));
+ wait_event(device->misc_wait, !test_bit(GOING_DISKLESS, &device->flags));

/* make sure there is no leftover from previous force-detach attempts */
clear_bit(FORCE_DETACH, &device->flags);
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index bafb62e..00bf490 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1813,10 +1813,9 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
mutex_unlock(device->state_mutex);
}

-static void update_on_disk_bitmap(struct drbd_device *device)
+static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
{
struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
- bool resync_done = test_and_clear_bit(RS_DONE, &device->flags);
device->rs_last_bcast = jiffies;

if (!get_ldev(device))
@@ -1832,7 +1831,87 @@ static void update_on_disk_bitmap(struct drbd_device *device)
put_ldev(device);
}

-static void try_update_all_on_disk_bitmaps(struct drbd_connection *connection)
+static void drbd_ldev_destroy(struct drbd_device *device)
+{
+ lc_destroy(device->resync);
+ device->resync = NULL;
+ lc_destroy(device->act_log);
+ device->act_log = NULL;
+ __no_warn(local,
+ drbd_free_ldev(device->ldev);
+ device->ldev = NULL;);
+ clear_bit(GOING_DISKLESS, &device->flags);
+ wake_up(&device->misc_wait);
+}
+
+static void go_diskless(struct drbd_device *device)
+{
+ D_ASSERT(device, device->state.disk == D_FAILED);
+ /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
+ * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
+ * the protected members anymore, though, so once put_ldev reaches zero
+ * again, it will be safe to free them. */
+
+ /* Try to write changed bitmap pages, read errors may have just
+ * set some bits outside the area covered by the activity log.
+ *
+ * If we have an IO error during the bitmap writeout,
+ * we will want a full sync next time, just in case.
+ * (Do we want a specific meta data flag for this?)
+ *
+ * If that does not make it to stable storage either,
+ * we cannot do anything about that anymore.
+ *
+ * We still need to check if both bitmap and ldev are present, we may
+ * end up here after a failed attach, before ldev was even assigned.
+ */
+ if (device->bitmap && device->ldev) {
+ /* An interrupted resync or similar is allowed to recounts bits
+ * while we detach.
+ * Any modifications would not be expected anymore, though.
+ */
+ if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
+ "detach", BM_LOCKED_TEST_ALLOWED)) {
+ if (test_bit(WAS_READ_ERROR, &device->flags)) {
+ drbd_md_set_flag(device, MDF_FULL_SYNC);
+ drbd_md_sync(device);
+ }
+ }
+ }
+
+ drbd_force_state(device, NS(disk, D_DISKLESS));
+}
+
+#define WORK_PENDING(work_bit, todo) (todo & (1UL << work_bit))
+static void do_device_work(struct drbd_device *device, const unsigned long todo)
+{
+ if (WORK_PENDING(RS_DONE, todo) ||
+ WORK_PENDING(RS_PROGRESS, todo))
+ update_on_disk_bitmap(device, WORK_PENDING(RS_DONE, todo));
+ if (WORK_PENDING(GO_DISKLESS, todo))
+ go_diskless(device);
+ if (WORK_PENDING(DESTROY_DISK, todo))
+ drbd_ldev_destroy(device);
+}
+
+#define DRBD_DEVICE_WORK_MASK \
+ ((1UL << GO_DISKLESS) \
+ |(1UL << DESTROY_DISK) \
+ |(1UL << RS_PROGRESS) \
+ |(1UL << RS_DONE) \
+ )
+
+static unsigned long get_work_bits(unsigned long *flags)
+{
+ unsigned long old, new;
+ do {
+ old = *flags;
+ new = old & ~DRBD_DEVICE_WORK_MASK;
+ } while (cmpxchg(flags, old, new) != old);
+ return old & DRBD_DEVICE_WORK_MASK;
+}
+
+static void do_unqueued_work(struct drbd_connection *connection)
{
struct drbd_peer_device *peer_device;
int vnr;
@@ -1840,12 +1919,13 @@ static void try_update_all_on_disk_bitmaps(struct drbd_connection *connection)
rcu_read_lock();
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
struct drbd_device *device = peer_device->device;
- if (!test_and_clear_bit(RS_PROGRESS, &device->flags))
+ unsigned long todo = get_work_bits(&device->flags);
+ if (!todo)
continue;

kref_get(&device->kref);
rcu_read_unlock();
- update_on_disk_bitmap(device);
+ do_device_work(device, todo);
kref_put(&device->kref, drbd_destroy_device);
rcu_read_lock();
}
@@ -1927,7 +2007,7 @@ static void wait_for_work(struct drbd_connection *connection, struct list_head *
maybe_send_barrier(connection,
connection->send.current_epoch_nr + 1);

- if (test_bit(CONN_RS_PROGRESS, &connection->flags))
+ if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
break;

/* drbd_send() may have called flush_signals() */
@@ -1972,8 +2052,8 @@ int drbd_worker(struct drbd_thread *thi)
if (list_empty(&work_list))
wait_for_work(connection, &work_list);

- if (test_and_clear_bit(CONN_RS_PROGRESS, &connection->flags))
- try_update_all_on_disk_bitmaps(connection);
+ if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags))
+ do_unqueued_work(connection);

if (signal_pending(current)) {
flush_signals(current);
@@ -1998,13 +2078,15 @@ int drbd_worker(struct drbd_thread *thi)
}

do {
+ if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags))
+ do_unqueued_work(connection);
while (!list_empty(&work_list)) {
w = list_first_entry(&work_list, struct drbd_work, list);
list_del_init(&w->list);
w->cb(w, 1);
}
dequeue_work_batch(&connection->sender_work, &work_list);
- } while (!list_empty(&work_list));
+ } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));

rcu_read_lock();
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
--
1.9.1

2014-06-30 15:56:09

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 13/18] drbd: move set_disk_ro() to after we persisted the new role

From: Lars Ellenberg <[email protected]>

This probably does not have any real life impact,
but we should first persist any potentially new UUID
and other meta data flags, as well as our new role,
before we allow/disallow write access.

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_nl.c | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 23670d8..ceebd31 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -655,7 +655,6 @@ drbd_set_role(struct drbd_device *const device, enum drbd_role new_role, int for
/* FIXME also wait for all pending P_BARRIER_ACK? */

if (new_role == R_SECONDARY) {
- set_disk_ro(device->vdisk, true);
if (get_ldev(device)) {
device->ldev->md.uuid[UI_CURRENT] &= ~(u64)1;
put_ldev(device);
@@ -667,7 +666,6 @@ drbd_set_role(struct drbd_device *const device, enum drbd_role new_role, int for
nc->discard_my_data = 0; /* without copy; single bit op is atomic */
mutex_unlock(&device->resource->conf_update);

- set_disk_ro(device->vdisk, false);
if (get_ldev(device)) {
if (((device->state.conn < C_CONNECTED ||
device->state.pdsk <= D_FAILED)
@@ -690,7 +688,7 @@ drbd_set_role(struct drbd_device *const device, enum drbd_role new_role, int for
}

drbd_md_sync(device);
-
+ set_disk_ro(device->vdisk, new_role == R_SECONDARY);
kobject_uevent(&disk_to_dev(device->vdisk)->kobj, KOBJ_CHANGE);
out:
mutex_unlock(device->state_mutex);
--
1.9.1

2014-06-30 15:56:07

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 14/18] drbd: explicitly submit meta data requests with REQ_NOIDLE

From: Lars Ellenberg <[email protected]>

For some reason we have assumed NOIDLE was implied
by one of the other flags we set. It is not (anymore?).
Explicitly set REQ_NOIDLE for synchronous meta data updates,
or we can seriously starve random writes when using CFQ.

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_actlog.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/drivers/block/drbd/drbd_actlog.c b/drivers/block/drbd/drbd_actlog.c
index 278c31f..14b2f27 100644
--- a/drivers/block/drbd/drbd_actlog.c
+++ b/drivers/block/drbd/drbd_actlog.c
@@ -150,7 +150,7 @@ static int _drbd_md_sync_page_io(struct drbd_device *device,

if ((rw & WRITE) && !test_bit(MD_NO_FUA, &device->flags))
rw |= REQ_FUA | REQ_FLUSH;
- rw |= REQ_SYNC;
+ rw |= REQ_SYNC | REQ_NOIDLE;

bio = bio_alloc_drbd(GFP_NOIO);
bio->bi_bdev = bdev->md_bdev;
--
1.9.1

2014-06-30 15:53:13

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 03/18] drbd: rename drbd_free_bc() to drbd_free_ldev()

Since the member of drbd_device is called ldev

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_int.h | 2 +-
drivers/block/drbd/drbd_main.c | 8 ++++----
2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index c87bc8e..82ece1b 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -950,7 +950,7 @@ extern int drbd_send_ov_request(struct drbd_peer_device *, sector_t sector, int
extern int drbd_send_bitmap(struct drbd_device *device);
extern void drbd_send_sr_reply(struct drbd_peer_device *, enum drbd_state_rv retcode);
extern void conn_send_sr_reply(struct drbd_connection *connection, enum drbd_state_rv retcode);
-extern void drbd_free_bc(struct drbd_backing_dev *ldev);
+extern void drbd_free_ldev(struct drbd_backing_dev *ldev);
extern void drbd_device_cleanup(struct drbd_device *device);
void drbd_print_uuids(struct drbd_device *device, const char *text);

diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index a6af935..8e0813d 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -1992,7 +1992,7 @@ void drbd_device_cleanup(struct drbd_device *device)
drbd_bm_cleanup(device);
}

- drbd_free_bc(device->ldev);
+ drbd_free_ldev(device->ldev);
device->ldev = NULL;

clear_bit(AL_SUSPENDED, &device->flags);
@@ -2187,7 +2187,7 @@ void drbd_destroy_device(struct kref *kref)
if (device->this_bdev)
bdput(device->this_bdev);

- drbd_free_bc(device->ldev);
+ drbd_free_ldev(device->ldev);
device->ldev = NULL;

drbd_release_all_peer_reqs(device);
@@ -2960,7 +2960,7 @@ fail:
return err;
}

-void drbd_free_bc(struct drbd_backing_dev *ldev)
+void drbd_free_ldev(struct drbd_backing_dev *ldev)
{
if (ldev == NULL)
return;
@@ -3533,7 +3533,7 @@ void drbd_ldev_destroy(struct drbd_device *device)
lc_destroy(device->act_log);
device->act_log = NULL;
__no_warn(local,
- drbd_free_bc(device->ldev);
+ drbd_free_ldev(device->ldev);
device->ldev = NULL;);

clear_bit(GO_DISKLESS, &device->flags);
--
1.9.1

2014-06-30 15:57:05

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 09/18] drbd: fix resync finished detection

From: Lars Ellenberg <[email protected]>

This fixes one recent regresion,
and one long existing bug.

The bug:
drbd_try_clear_on_disk_bm() assumed that all "count" bits have to be
accounted in the resync extent corresponding to the start sector.

Since we allow application requests to cross our "extent" boundaries,
this assumption is no longer true, resulting in possible misaccounting,
scary messages
("BAD! sector=12345s enr=6 rs_left=-7 rs_failed=0 count=58 cstate=..."),
and potentially, if the last bit to be cleared during resync would
reside in previously misaccounted resync extent, the resync would never
be recognized as finished, but would be "stalled" forever, even though
all blocks are in sync again and all bits have been cleared...

The regression was introduced by
drbd: get rid of atomic update on disk bitmap works

For an "empty" resync (rs_total == 0), we must not "finish" the
resync on the SyncSource before the SyncTarget knows all relevant
information (sync uuid). We need to wait for the full round-trip,
the SyncTarget will then explicitly notify us.

Also for normal, non-empty resyncs (rs_total > 0), the resync-finished
condition needs to be tested before the schedule() in wait_for_work, or
it is likely to be missed.

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_actlog.c | 315 ++++++++++++++++++---------------------
drivers/block/drbd/drbd_int.h | 42 ++++--
drivers/block/drbd/drbd_state.c | 3 +
drivers/block/drbd/drbd_worker.c | 42 +++---
4 files changed, 197 insertions(+), 205 deletions(-)

diff --git a/drivers/block/drbd/drbd_actlog.c b/drivers/block/drbd/drbd_actlog.c
index 9c42edf..278c31f 100644
--- a/drivers/block/drbd/drbd_actlog.c
+++ b/drivers/block/drbd/drbd_actlog.c
@@ -667,36 +667,56 @@ int drbd_initialize_al(struct drbd_device *device, void *buffer)
return 0;
}

+static const char *drbd_change_sync_fname[] = {
+ [RECORD_RS_FAILED] = "drbd_rs_failed_io",
+ [SET_IN_SYNC] = "drbd_set_in_sync",
+ [SET_OUT_OF_SYNC] = "drbd_set_out_of_sync"
+};
+
/* ATTENTION. The AL's extents are 4MB each, while the extents in the
* resync LRU-cache are 16MB each.
* The caller of this function has to hold an get_ldev() reference.
*
+ * Adjusts the caching members ->rs_left (success) or ->rs_failed (!success),
+ * potentially pulling in (and recounting the corresponding bits)
+ * this resync extent into the resync extent lru cache.
+ *
+ * Returns whether all bits have been cleared for this resync extent,
+ * precisely: (rs_left <= rs_failed)
+ *
* TODO will be obsoleted once we have a caching lru of the on disk bitmap
*/
-static void drbd_try_clear_on_disk_bm(struct drbd_device *device, sector_t sector,
- int count, int success)
+static bool update_rs_extent(struct drbd_device *device,
+ unsigned int enr, int count,
+ enum update_sync_bits_mode mode)
{
struct lc_element *e;
- unsigned int enr;

D_ASSERT(device, atomic_read(&device->local_cnt));

- /* I simply assume that a sector/size pair never crosses
- * a 16 MB extent border. (Currently this is true...) */
- enr = BM_SECT_TO_EXT(sector);
-
- e = lc_get(device->resync, enr);
+ /* When setting out-of-sync bits,
+ * we don't need it cached (lc_find).
+ * But if it is present in the cache,
+ * we should update the cached bit count.
+ * Otherwise, that extent should be in the resync extent lru cache
+ * already -- or we want to pull it in if necessary -- (lc_get),
+ * then update and check rs_left and rs_failed. */
+ if (mode == SET_OUT_OF_SYNC)
+ e = lc_find(device->resync, enr);
+ else
+ e = lc_get(device->resync, enr);
if (e) {
struct bm_extent *ext = lc_entry(e, struct bm_extent, lce);
if (ext->lce.lc_number == enr) {
- if (success)
+ if (mode == SET_IN_SYNC)
ext->rs_left -= count;
+ else if (mode == SET_OUT_OF_SYNC)
+ ext->rs_left += count;
else
ext->rs_failed += count;
if (ext->rs_left < ext->rs_failed) {
- drbd_warn(device, "BAD! sector=%llus enr=%u rs_left=%d "
+ drbd_warn(device, "BAD! enr=%u rs_left=%d "
"rs_failed=%d count=%d cstate=%s\n",
- (unsigned long long)sector,
ext->lce.lc_number, ext->rs_left,
ext->rs_failed, count,
drbd_conn_str(device->state.conn));
@@ -730,24 +750,27 @@ static void drbd_try_clear_on_disk_bm(struct drbd_device *device, sector_t secto
ext->lce.lc_number, ext->rs_failed);
}
ext->rs_left = rs_left;
- ext->rs_failed = success ? 0 : count;
+ ext->rs_failed = (mode == RECORD_RS_FAILED) ? count : 0;
/* we don't keep a persistent log of the resync lru,
* we can commit any change right away. */
lc_committed(device->resync);
}
- lc_put(device->resync, &ext->lce);
+ if (mode != SET_OUT_OF_SYNC)
+ lc_put(device->resync, &ext->lce);
/* no race, we are within the al_lock! */

- if (ext->rs_left == ext->rs_failed) {
+ if (ext->rs_left <= ext->rs_failed) {
ext->rs_failed = 0;
- wake_up(&first_peer_device(device)->connection->sender_work.q_wait);
+ return true;
}
- } else {
+ } else if (mode != SET_OUT_OF_SYNC) {
+ /* be quiet if lc_find() did not find it. */
drbd_err(device, "lc_get() failed! locked=%d/%d flags=%lu\n",
device->resync_locked,
device->resync->nr_elements,
device->resync->flags);
}
+ return false;
}

void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go)
@@ -766,105 +789,112 @@ void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go
}
}

-/* clear the bit corresponding to the piece of storage in question:
- * size byte of data starting from sector. Only clear a bits of the affected
- * one ore more _aligned_ BM_BLOCK_SIZE blocks.
- *
- * called by worker on C_SYNC_TARGET and receiver on SyncSource.
- *
- */
-void __drbd_set_in_sync(struct drbd_device *device, sector_t sector, int size,
- const char *file, const unsigned int line)
+/* It is called lazy update, so don't do write-out too often. */
+static bool lazy_bitmap_update_due(struct drbd_device *device)
{
- /* Is called from worker and receiver context _only_ */
- unsigned long sbnr, ebnr, lbnr;
- unsigned long count = 0;
- sector_t esector, nr_sectors;
- int wake_up = 0;
- unsigned long flags;
+ return time_after(jiffies, device->rs_last_bcast + 2*HZ);
+}

- if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) {
- drbd_err(device, "drbd_set_in_sync: sector=%llus size=%d nonsense!\n",
- (unsigned long long)sector, size);
+static void maybe_schedule_on_disk_bitmap_update(struct drbd_device *device, bool rs_done)
+{
+ struct drbd_connection *connection;
+ if (rs_done)
+ set_bit(RS_DONE, &device->flags);
+ /* and also set RS_PROGRESS below */
+ else if (!lazy_bitmap_update_due(device))
return;
- }

- if (!get_ldev(device))
- return; /* no disk, no metadata, no bitmap to clear bits in */
-
- nr_sectors = drbd_get_capacity(device->this_bdev);
- esector = sector + (size >> 9) - 1;
-
- if (!expect(sector < nr_sectors))
- goto out;
- if (!expect(esector < nr_sectors))
- esector = nr_sectors - 1;
-
- lbnr = BM_SECT_TO_BIT(nr_sectors-1);
-
- /* we clear it (in sync).
- * round up start sector, round down end sector. we make sure we only
- * clear full, aligned, BM_BLOCK_SIZE (4K) blocks */
- if (unlikely(esector < BM_SECT_PER_BIT-1))
- goto out;
- if (unlikely(esector == (nr_sectors-1)))
- ebnr = lbnr;
- else
- ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1));
- sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1);
-
- if (sbnr > ebnr)
- goto out;
+ /* compare with test_and_clear_bit() calls in and above
+ * try_update_all_on_disk_bitmaps() from the drbd_worker(). */
+ if (test_and_set_bit(RS_PROGRESS, &device->flags))
+ return;
+ connection = first_peer_device(device)->connection;
+ if (!test_and_set_bit(CONN_RS_PROGRESS, &connection->flags))
+ wake_up(&connection->sender_work.q_wait);
+}

+static int update_sync_bits(struct drbd_device *device,
+ unsigned long sbnr, unsigned long ebnr,
+ enum update_sync_bits_mode mode)
+{
/*
- * ok, (capacity & 7) != 0 sometimes, but who cares...
- * we count rs_{total,left} in bits, not sectors.
+ * We keep a count of set bits per resync-extent in the ->rs_left
+ * caching member, so we need to loop and work within the resync extent
+ * alignment. Typically this loop will execute exactly once.
*/
- count = drbd_bm_clear_bits(device, sbnr, ebnr);
- if (count) {
- drbd_advance_rs_marks(device, drbd_bm_total_weight(device));
- spin_lock_irqsave(&device->al_lock, flags);
- drbd_try_clear_on_disk_bm(device, sector, count, true);
- spin_unlock_irqrestore(&device->al_lock, flags);
-
- /* just wake_up unconditional now, various lc_chaged(),
- * lc_put() in drbd_try_clear_on_disk_bm(). */
- wake_up = 1;
+ unsigned long flags;
+ unsigned long count = 0;
+ unsigned int cleared = 0;
+ while (sbnr <= ebnr) {
+ /* set temporary boundary bit number to last bit number within
+ * the resync extent of the current start bit number,
+ * but cap at provided end bit number */
+ unsigned long tbnr = min(ebnr, sbnr | BM_BLOCKS_PER_BM_EXT_MASK);
+ unsigned long c;
+
+ if (mode == RECORD_RS_FAILED)
+ /* Only called from drbd_rs_failed_io(), bits
+ * supposedly still set. Recount, maybe some
+ * of the bits have been successfully cleared
+ * by application IO meanwhile.
+ */
+ c = drbd_bm_count_bits(device, sbnr, tbnr);
+ else if (mode == SET_IN_SYNC)
+ c = drbd_bm_clear_bits(device, sbnr, tbnr);
+ else /* if (mode == SET_OUT_OF_SYNC) */
+ c = drbd_bm_set_bits(device, sbnr, tbnr);
+
+ if (c) {
+ spin_lock_irqsave(&device->al_lock, flags);
+ cleared += update_rs_extent(device, BM_BIT_TO_EXT(sbnr), c, mode);
+ spin_unlock_irqrestore(&device->al_lock, flags);
+ count += c;
+ }
+ sbnr = tbnr + 1;
}
-out:
- put_ldev(device);
- if (wake_up)
+ if (count) {
+ if (mode == SET_IN_SYNC) {
+ unsigned long still_to_go = drbd_bm_total_weight(device);
+ bool rs_is_done = (still_to_go <= device->rs_failed);
+ drbd_advance_rs_marks(device, still_to_go);
+ if (cleared || rs_is_done)
+ maybe_schedule_on_disk_bitmap_update(device, rs_is_done);
+ } else if (mode == RECORD_RS_FAILED)
+ device->rs_failed += count;
wake_up(&device->al_wait);
+ }
+ return count;
}

-/*
- * this is intended to set one request worth of data out of sync.
- * affects at least 1 bit,
- * and at most 1+DRBD_MAX_BIO_SIZE/BM_BLOCK_SIZE bits.
+/* clear the bit corresponding to the piece of storage in question:
+ * size byte of data starting from sector. Only clear a bits of the affected
+ * one ore more _aligned_ BM_BLOCK_SIZE blocks.
+ *
+ * called by worker on C_SYNC_TARGET and receiver on SyncSource.
*
- * called by tl_clear and drbd_send_dblock (==drbd_make_request).
- * so this can be _any_ process.
*/
-int __drbd_set_out_of_sync(struct drbd_device *device, sector_t sector, int size,
- const char *file, const unsigned int line)
+int __drbd_change_sync(struct drbd_device *device, sector_t sector, int size,
+ enum update_sync_bits_mode mode,
+ const char *file, const unsigned int line)
{
- unsigned long sbnr, ebnr, flags;
+ /* Is called from worker and receiver context _only_ */
+ unsigned long sbnr, ebnr, lbnr;
+ unsigned long count = 0;
sector_t esector, nr_sectors;
- unsigned int enr, count = 0;
- struct lc_element *e;

- /* this should be an empty REQ_FLUSH */
- if (size == 0)
+ /* This would be an empty REQ_FLUSH, be silent. */
+ if ((mode == SET_OUT_OF_SYNC) && size == 0)
return 0;

- if (size < 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) {
- drbd_err(device, "sector: %llus, size: %d\n",
- (unsigned long long)sector, size);
+ if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) {
+ drbd_err(device, "%s: sector=%llus size=%d nonsense!\n",
+ drbd_change_sync_fname[mode],
+ (unsigned long long)sector, size);
return 0;
}

if (!get_ldev(device))
- return 0; /* no disk, no metadata, no bitmap to set bits in */
+ return 0; /* no disk, no metadata, no bitmap to manipulate bits in */

nr_sectors = drbd_get_capacity(device->this_bdev);
esector = sector + (size >> 9) - 1;
@@ -874,25 +904,28 @@ int __drbd_set_out_of_sync(struct drbd_device *device, sector_t sector, int size
if (!expect(esector < nr_sectors))
esector = nr_sectors - 1;

- /* we set it out of sync,
- * we do not need to round anything here */
- sbnr = BM_SECT_TO_BIT(sector);
- ebnr = BM_SECT_TO_BIT(esector);
-
- /* ok, (capacity & 7) != 0 sometimes, but who cares...
- * we count rs_{total,left} in bits, not sectors. */
- spin_lock_irqsave(&device->al_lock, flags);
- count = drbd_bm_set_bits(device, sbnr, ebnr);
+ lbnr = BM_SECT_TO_BIT(nr_sectors-1);

- enr = BM_SECT_TO_EXT(sector);
- e = lc_find(device->resync, enr);
- if (e)
- lc_entry(e, struct bm_extent, lce)->rs_left += count;
- spin_unlock_irqrestore(&device->al_lock, flags);
+ if (mode == SET_IN_SYNC) {
+ /* Round up start sector, round down end sector. We make sure
+ * we only clear full, aligned, BM_BLOCK_SIZE blocks. */
+ if (unlikely(esector < BM_SECT_PER_BIT-1))
+ goto out;
+ if (unlikely(esector == (nr_sectors-1)))
+ ebnr = lbnr;
+ else
+ ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1));
+ sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1);
+ } else {
+ /* We set it out of sync, or record resync failure.
+ * Should not round anything here. */
+ sbnr = BM_SECT_TO_BIT(sector);
+ ebnr = BM_SECT_TO_BIT(esector);
+ }

+ count = update_sync_bits(device, sbnr, ebnr, mode);
out:
put_ldev(device);
-
return count;
}

@@ -1209,69 +1242,3 @@ int drbd_rs_del_all(struct drbd_device *device)

return 0;
}
-
-/**
- * drbd_rs_failed_io() - Record information on a failure to resync the specified blocks
- * @device: DRBD device.
- * @sector: The sector number.
- * @size: Size of failed IO operation, in byte.
- */
-void drbd_rs_failed_io(struct drbd_device *device, sector_t sector, int size)
-{
- /* Is called from worker and receiver context _only_ */
- unsigned long sbnr, ebnr, lbnr;
- unsigned long count;
- sector_t esector, nr_sectors;
- int wake_up = 0;
-
- if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) {
- drbd_err(device, "drbd_rs_failed_io: sector=%llus size=%d nonsense!\n",
- (unsigned long long)sector, size);
- return;
- }
- nr_sectors = drbd_get_capacity(device->this_bdev);
- esector = sector + (size >> 9) - 1;
-
- if (!expect(sector < nr_sectors))
- return;
- if (!expect(esector < nr_sectors))
- esector = nr_sectors - 1;
-
- lbnr = BM_SECT_TO_BIT(nr_sectors-1);
-
- /*
- * round up start sector, round down end sector. we make sure we only
- * handle full, aligned, BM_BLOCK_SIZE (4K) blocks */
- if (unlikely(esector < BM_SECT_PER_BIT-1))
- return;
- if (unlikely(esector == (nr_sectors-1)))
- ebnr = lbnr;
- else
- ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1));
- sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1);
-
- if (sbnr > ebnr)
- return;
-
- /*
- * ok, (capacity & 7) != 0 sometimes, but who cares...
- * we count rs_{total,left} in bits, not sectors.
- */
- spin_lock_irq(&device->al_lock);
- count = drbd_bm_count_bits(device, sbnr, ebnr);
- if (count) {
- device->rs_failed += count;
-
- if (get_ldev(device)) {
- drbd_try_clear_on_disk_bm(device, sector, count, false);
- put_ldev(device);
- }
-
- /* just wake_up unconditional now, various lc_chaged(),
- * lc_put() in drbd_try_clear_on_disk_bm(). */
- wake_up = 1;
- }
- spin_unlock_irq(&device->al_lock);
- if (wake_up)
- wake_up(&device->al_wait);
-}
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index eb002a7..a16f9ae 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -432,7 +432,11 @@ enum {
* goes into C_CONNECTED state. */
CONSIDER_RESYNC,

+ RS_PROGRESS, /* tell worker that resync made significant progress */
+ RS_DONE, /* tell worker that resync is done */
+
MD_NO_FUA, /* Users wants us to not use FUA/FLUSH on meta data dev */
+
SUSPEND_IO, /* suspend application io */
BITMAP_IO, /* suspend application io;
once no more io in flight, start bitmap io */
@@ -577,6 +581,7 @@ enum {
* and potentially deadlock on, this drbd worker.
*/
DISCONNECT_SENT,
+ CONN_RS_PROGRESS, /* tell worker that resync made significant progress */
};

struct drbd_resource {
@@ -1106,17 +1111,21 @@ struct bm_extent {
/* in which _bitmap_ extent (resp. sector) the bit for a certain
* _storage_ sector is located in */
#define BM_SECT_TO_EXT(x) ((x)>>(BM_EXT_SHIFT-9))
+#define BM_BIT_TO_EXT(x) ((x) >> (BM_EXT_SHIFT - BM_BLOCK_SHIFT))

-/* how much _storage_ sectors we have per bitmap sector */
+/* first storage sector a bitmap extent corresponds to */
#define BM_EXT_TO_SECT(x) ((sector_t)(x) << (BM_EXT_SHIFT-9))
+/* how much _storage_ sectors we have per bitmap extent */
#define BM_SECT_PER_EXT BM_EXT_TO_SECT(1)
+/* how many bits are covered by one bitmap extent (resync extent) */
+#define BM_BITS_PER_EXT (1UL << (BM_EXT_SHIFT - BM_BLOCK_SHIFT))
+
+#define BM_BLOCKS_PER_BM_EXT_MASK (BM_BITS_PER_EXT - 1)
+

/* in one sector of the bitmap, we have this many activity_log extents. */
#define AL_EXT_PER_BM_SECT (1 << (BM_EXT_SHIFT - AL_EXTENT_SHIFT))

-#define BM_BLOCKS_PER_BM_EXT_B (BM_EXT_SHIFT - BM_BLOCK_SHIFT)
-#define BM_BLOCKS_PER_BM_EXT_MASK ((1<<BM_BLOCKS_PER_BM_EXT_B) - 1)
-
/* the extent in "PER_EXTENT" below is an activity log extent
* we need that many (long words/bytes) to store the bitmap
* of one AL_EXTENT_SIZE chunk of storage.
@@ -1214,7 +1223,6 @@ extern unsigned long _drbd_bm_find_next(struct drbd_device *device, unsigned lon
extern unsigned long _drbd_bm_find_next_zero(struct drbd_device *device, unsigned long bm_fo);
extern unsigned long _drbd_bm_total_weight(struct drbd_device *device);
extern unsigned long drbd_bm_total_weight(struct drbd_device *device);
-extern int drbd_bm_rs_done(struct drbd_device *device);
/* for receive_bitmap */
extern void drbd_bm_merge_lel(struct drbd_device *device, size_t offset,
size_t number, unsigned long *buffer);
@@ -1503,14 +1511,17 @@ extern int drbd_rs_del_all(struct drbd_device *device);
extern void drbd_rs_failed_io(struct drbd_device *device,
sector_t sector, int size);
extern void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go);
-extern void __drbd_set_in_sync(struct drbd_device *device, sector_t sector,
- int size, const char *file, const unsigned int line);
+
+enum update_sync_bits_mode { RECORD_RS_FAILED, SET_OUT_OF_SYNC, SET_IN_SYNC };
+extern int __drbd_change_sync(struct drbd_device *device, sector_t sector, int size,
+ enum update_sync_bits_mode mode,
+ const char *file, const unsigned int line);
#define drbd_set_in_sync(device, sector, size) \
- __drbd_set_in_sync(device, sector, size, __FILE__, __LINE__)
-extern int __drbd_set_out_of_sync(struct drbd_device *device, sector_t sector,
- int size, const char *file, const unsigned int line);
+ __drbd_change_sync(device, sector, size, SET_IN_SYNC, __FILE__, __LINE__)
#define drbd_set_out_of_sync(device, sector, size) \
- __drbd_set_out_of_sync(device, sector, size, __FILE__, __LINE__)
+ __drbd_change_sync(device, sector, size, SET_OUT_OF_SYNC, __FILE__, __LINE__)
+#define drbd_rs_failed_io(device, sector, size) \
+ __drbd_change_sync(device, sector, size, RECORD_RS_FAILED, __FILE__, __LINE__)
extern void drbd_al_shrink(struct drbd_device *device);
extern int drbd_initialize_al(struct drbd_device *, void *);

@@ -1915,6 +1926,15 @@ static inline void _sub_unacked(struct drbd_device *device, int n, const char *f
ERR_IF_CNT_IS_NEGATIVE(unacked_cnt, func, line);
}

+static inline bool is_sync_state(enum drbd_conns connection_state)
+{
+ return
+ (connection_state == C_SYNC_SOURCE
+ || connection_state == C_SYNC_TARGET
+ || connection_state == C_PAUSED_SYNC_S
+ || connection_state == C_PAUSED_SYNC_T);
+}
+
/**
* get_ldev() - Increase the ref count on device->ldev. Returns 0 if there is no ldev
* @M: DRBD device.
diff --git a/drivers/block/drbd/drbd_state.c b/drivers/block/drbd/drbd_state.c
index 19da7c7..1bddd6c 100644
--- a/drivers/block/drbd/drbd_state.c
+++ b/drivers/block/drbd/drbd_state.c
@@ -1011,6 +1011,9 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
atomic_inc(&device->local_cnt);

did_remote = drbd_should_do_remote(device->state);
+ if (!is_sync_state(os.conn) && is_sync_state(ns.conn))
+ clear_bit(RS_DONE, &device->flags);
+
device->state.i = ns.i;
should_do_remote = drbd_should_do_remote(device->state);
device->resource->susp = ns.susp;
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 47bc840..bafb62e 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1740,11 +1740,20 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
device->rs_mark_time[i] = now;
}
_drbd_pause_after(device);
+ /* Forget potentially stale cached per resync extent bit-counts.
+ * Open coded drbd_rs_cancel_all(device), we already have IRQs
+ * disabled, and know the disk state is ok. */
+ spin_lock(&device->al_lock);
+ lc_reset(device->resync);
+ device->resync_locked = 0;
+ device->resync_wenr = LC_FREE;
+ spin_unlock(&device->al_lock);
}
write_unlock(&global_state_lock);
spin_unlock_irq(&device->resource->req_lock);

if (r == SS_SUCCESS) {
+ wake_up(&device->al_wait); /* for lc_reset() above */
/* reset rs_last_bcast when a resync or verify is started,
* to deal with potential jiffies wrap. */
device->rs_last_bcast = jiffies - HZ;
@@ -1807,36 +1816,22 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
static void update_on_disk_bitmap(struct drbd_device *device)
{
struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
+ bool resync_done = test_and_clear_bit(RS_DONE, &device->flags);
device->rs_last_bcast = jiffies;

if (!get_ldev(device))
return;

drbd_bm_write_lazy(device, 0);
- if (drbd_bm_total_weight(device) <= device->rs_failed)
+ if (resync_done && is_sync_state(device->state.conn))
drbd_resync_finished(device);
+
drbd_bcast_event(device, &sib);
/* update timestamp, in case it took a while to write out stuff */
device->rs_last_bcast = jiffies;
put_ldev(device);
}

-bool wants_lazy_bitmap_update(struct drbd_device *device)
-{
- enum drbd_conns connection_state = device->state.conn;
- return
- /* only do a lazy writeout, if device is in some resync state */
- (connection_state == C_SYNC_SOURCE
- || connection_state == C_SYNC_TARGET
- || connection_state == C_PAUSED_SYNC_S
- || connection_state == C_PAUSED_SYNC_T) &&
- /* AND
- * either we just finished, or the last lazy update
- * was some time ago already. */
- (drbd_bm_total_weight(device) <= device->rs_failed
- || time_after(jiffies, device->rs_last_bcast + 2*HZ));
-}
-
static void try_update_all_on_disk_bitmaps(struct drbd_connection *connection)
{
struct drbd_peer_device *peer_device;
@@ -1845,8 +1840,9 @@ static void try_update_all_on_disk_bitmaps(struct drbd_connection *connection)
rcu_read_lock();
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
struct drbd_device *device = peer_device->device;
- if (!wants_lazy_bitmap_update(device))
+ if (!test_and_clear_bit(RS_PROGRESS, &device->flags))
continue;
+
kref_get(&device->kref);
rcu_read_unlock();
update_on_disk_bitmap(device);
@@ -1930,15 +1926,18 @@ static void wait_for_work(struct drbd_connection *connection, struct list_head *
if (send_barrier)
maybe_send_barrier(connection,
connection->send.current_epoch_nr + 1);
+
+ if (test_bit(CONN_RS_PROGRESS, &connection->flags))
+ break;
+
/* drbd_send() may have called flush_signals() */
if (get_t_state(&connection->worker) != RUNNING)
break;
+
schedule();
/* may be woken up for other things but new work, too,
* e.g. if the current epoch got closed.
* In which case we send the barrier above. */
-
- try_update_all_on_disk_bitmaps(connection);
}
finish_wait(&connection->sender_work.q_wait, &wait);

@@ -1973,6 +1972,9 @@ int drbd_worker(struct drbd_thread *thi)
if (list_empty(&work_list))
wait_for_work(connection, &work_list);

+ if (test_and_clear_bit(CONN_RS_PROGRESS, &connection->flags))
+ try_update_all_on_disk_bitmaps(connection);
+
if (signal_pending(current)) {
flush_signals(current);
if (get_t_state(thi) == RUNNING) {
--
1.9.1

2014-06-30 15:57:03

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 12/18] drbd: trigger tcp_push_pending_frames() for PING and PING_ACK

From: Lars Ellenberg <[email protected]>

This should reduce latency for such in-DRBD-protocol "pings",
and may help reduce spurious disconnect/reconnect cycles due to
"PingAck did not arrive in time."

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_main.c | 5 +++++
1 file changed, 5 insertions(+)

diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 8e0813d..77d34a6 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -662,6 +662,11 @@ static int __send_command(struct drbd_connection *connection, int vnr,
msg_flags);
if (data && !err)
err = drbd_send_all(connection, sock->socket, data, size, 0);
+ /* DRBD protocol "pings" are latency critical.
+ * This is supposed to trigger tcp_push_pending_frames() */
+ if (!err && (cmd == P_PING || cmd == P_PING_ACK))
+ drbd_tcp_nodelay(sock->socket);
+
return err;
}

--
1.9.1

2014-06-30 15:57:00

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 11/18] drbd: re-add lost conf_mutex protection in drbd_set_role

From: Lars Ellenberg <[email protected]>

The conf_update mutex used to be held while clearing the
net_conf->discard_my_data flag inside drbd_set_role.

It was moved into drbd_adm_set_role with
drbd: allow parallel promote/demote actions
but then replaced at that location by the newly introduced adm_mutex with
drbd: Fix a potential deadlock in drbdsetup, introduce resource->adm_mutex

And I simply forgot to put it back in at the original location.

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_nl.c | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 133e9b3..23670d8 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -661,11 +661,11 @@ drbd_set_role(struct drbd_device *const device, enum drbd_role new_role, int for
put_ldev(device);
}
} else {
- /* Called from drbd_adm_set_role only.
- * We are still holding the conf_update mutex. */
+ mutex_lock(&device->resource->conf_update);
nc = connection->net_conf;
if (nc)
nc->discard_my_data = 0; /* without copy; single bit op is atomic */
+ mutex_unlock(&device->resource->conf_update);

set_disk_ro(device->vdisk, false);
if (get_ldev(device)) {
--
1.9.1

2014-06-30 16:01:37

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 05/18] drbd: refactor use of first_peer_device()

From: Lars Ellenberg <[email protected]>

Reduce the number of calls to first_peer_device(). Instead, call
first_peer_device() just once to assign a local variable peer_device.

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_nl.c | 30 +++++++++++-------
drivers/block/drbd/drbd_receiver.c | 18 ++++++-----
drivers/block/drbd/drbd_req.c | 25 ++++++++-------
drivers/block/drbd/drbd_state.c | 65 +++++++++++++++++++-------------------
drivers/block/drbd/drbd_worker.c | 55 +++++++++++++++++---------------
5 files changed, 105 insertions(+), 88 deletions(-)

diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 25f4b6f..0bf8a60 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -552,8 +552,10 @@ void conn_try_outdate_peer_async(struct drbd_connection *connection)
}

enum drbd_state_rv
-drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
+drbd_set_role(struct drbd_device *const device, enum drbd_role new_role, int force)
{
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
const int max_tries = 4;
enum drbd_state_rv rv = SS_UNKNOWN_ERROR;
struct net_conf *nc;
@@ -601,7 +603,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
device->state.disk == D_CONSISTENT && mask.pdsk == 0) {
D_ASSERT(device, device->state.pdsk == D_UNKNOWN);

- if (conn_try_outdate_peer(first_peer_device(device)->connection)) {
+ if (conn_try_outdate_peer(connection)) {
val.disk = D_UP_TO_DATE;
mask.disk = D_MASK;
}
@@ -611,7 +613,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
if (rv == SS_NOTHING_TO_DO)
goto out;
if (rv == SS_PRIMARY_NOP && mask.pdsk == 0) {
- if (!conn_try_outdate_peer(first_peer_device(device)->connection) && force) {
+ if (!conn_try_outdate_peer(connection) && force) {
drbd_warn(device, "Forced into split brain situation!\n");
mask.pdsk = D_MASK;
val.pdsk = D_OUTDATED;
@@ -624,7 +626,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
retry at most once more in this case. */
int timeo;
rcu_read_lock();
- nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+ nc = rcu_dereference(connection->net_conf);
timeo = nc ? (nc->ping_timeo + 1) * HZ / 10 : 1;
rcu_read_unlock();
schedule_timeout_interruptible(timeo);
@@ -661,7 +663,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
} else {
/* Called from drbd_adm_set_role only.
* We are still holding the conf_update mutex. */
- nc = first_peer_device(device)->connection->net_conf;
+ nc = connection->net_conf;
if (nc)
nc->discard_my_data = 0; /* without copy; single bit op is atomic */

@@ -683,8 +685,8 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
if (device->state.conn >= C_WF_REPORT_PARAMS) {
/* if this was forced, we should consider sync */
if (forced)
- drbd_send_uuids(first_peer_device(device));
- drbd_send_current_state(first_peer_device(device));
+ drbd_send_uuids(peer_device);
+ drbd_send_current_state(peer_device);
}

drbd_md_sync(device);
@@ -1433,6 +1435,8 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
{
struct drbd_config_context adm_ctx;
struct drbd_device *device;
+ struct drbd_peer_device *peer_device;
+ struct drbd_connection *connection;
int err;
enum drbd_ret_code retcode;
enum determine_dev_size dd;
@@ -1455,7 +1459,9 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)

device = adm_ctx.device;
mutex_lock(&adm_ctx.resource->adm_mutex);
- conn_reconfig_start(first_peer_device(device)->connection);
+ peer_device = first_peer_device(device);
+ connection = peer_device ? peer_device->connection : NULL;
+ conn_reconfig_start(connection);

/* if you want to reconfigure, please tear down first */
if (device->state.disk > D_DISKLESS) {
@@ -1522,7 +1528,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
goto fail;

rcu_read_lock();
- nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+ nc = rcu_dereference(connection->net_conf);
if (nc) {
if (new_disk_conf->fencing == FP_STONITH && nc->wire_protocol == DRBD_PROT_A) {
rcu_read_unlock();
@@ -1642,7 +1648,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
*/
wait_event(device->misc_wait, !atomic_read(&device->ap_pending_cnt) || drbd_suspended(device));
/* and for any other previously queued work */
- drbd_flush_workqueue(&first_peer_device(device)->connection->sender_work);
+ drbd_flush_workqueue(&connection->sender_work);

rv = _drbd_request_state(device, NS(disk, D_ATTACHING), CS_VERBOSE);
retcode = rv; /* FIXME: Type mismatch. */
@@ -1838,7 +1844,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)

kobject_uevent(&disk_to_dev(device->vdisk)->kobj, KOBJ_CHANGE);
put_ldev(device);
- conn_reconfig_done(first_peer_device(device)->connection);
+ conn_reconfig_done(connection);
mutex_unlock(&adm_ctx.resource->adm_mutex);
drbd_adm_finish(&adm_ctx, info, retcode);
return 0;
@@ -1849,7 +1855,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
drbd_force_state(device, NS(disk, D_DISKLESS));
drbd_md_sync(device);
fail:
- conn_reconfig_done(first_peer_device(device)->connection);
+ conn_reconfig_done(connection);
if (nbc) {
if (nbc->backing_bdev)
blkdev_put(nbc->backing_bdev,
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index be0c376..bb1434d 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -2857,8 +2857,10 @@ static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
-1091 requires proto 91
-1096 requires proto 96
*/
-static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
+static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
{
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
u64 self, peer;
int i, j;

@@ -2884,7 +2886,7 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho

if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {

- if (first_peer_device(device)->connection->agreed_pro_version < 91)
+ if (connection->agreed_pro_version < 91)
return -1091;

if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
@@ -2907,7 +2909,7 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho

if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {

- if (first_peer_device(device)->connection->agreed_pro_version < 91)
+ if (connection->agreed_pro_version < 91)
return -1091;

if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
@@ -2940,7 +2942,7 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
case 1: /* self_pri && !peer_pri */ return 1;
case 2: /* !self_pri && peer_pri */ return -1;
case 3: /* self_pri && peer_pri */
- dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
+ dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
return dc ? -1 : 1;
}
}
@@ -2953,14 +2955,14 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
*rule_nr = 51;
peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
if (self == peer) {
- if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
+ if (connection->agreed_pro_version < 96 ?
(device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
(device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
/* The last P_SYNC_UUID did not get though. Undo the last start of
resync as sync source modifications of the peer's UUIDs. */

- if (first_peer_device(device)->connection->agreed_pro_version < 91)
+ if (connection->agreed_pro_version < 91)
return -1091;

device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
@@ -2990,14 +2992,14 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
*rule_nr = 71;
self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
if (self == peer) {
- if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
+ if (connection->agreed_pro_version < 96 ?
(device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
(device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
/* The last P_SYNC_UUID did not get though. Undo the last start of
resync as sync source modifications of our UUIDs. */

- if (first_peer_device(device)->connection->agreed_pro_version < 91)
+ if (connection->agreed_pro_version < 91)
return -1091;

__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index 4c7fee1..042bbc6 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -454,7 +454,9 @@ static void drbd_report_io_error(struct drbd_device *device, struct drbd_request
int __req_mod(struct drbd_request *req, enum drbd_req_event what,
struct bio_and_error *m)
{
- struct drbd_device *device = req->device;
+ struct drbd_device *const device = req->device;
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
struct net_conf *nc;
int p, rv = 0;

@@ -477,7 +479,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
* and from w_read_retry_remote */
D_ASSERT(device, !(req->rq_state & RQ_NET_MASK));
rcu_read_lock();
- nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+ nc = rcu_dereference(connection->net_conf);
p = nc->wire_protocol;
rcu_read_unlock();
req->rq_state |=
@@ -549,7 +551,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
D_ASSERT(device, (req->rq_state & RQ_LOCAL_MASK) == 0);
mod_rq_state(req, m, 0, RQ_NET_QUEUED);
req->w.cb = w_send_read_req;
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+ drbd_queue_work(&connection->sender_work,
&req->w);
break;

@@ -585,23 +587,23 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK);
req->w.cb = w_send_dblock;
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+ drbd_queue_work(&connection->sender_work,
&req->w);

/* close the epoch, in case it outgrew the limit */
rcu_read_lock();
- nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+ nc = rcu_dereference(connection->net_conf);
p = nc->max_epoch_size;
rcu_read_unlock();
- if (first_peer_device(device)->connection->current_tle_writes >= p)
- start_new_tl_epoch(first_peer_device(device)->connection);
+ if (connection->current_tle_writes >= p)
+ start_new_tl_epoch(connection);

break;

case QUEUE_FOR_SEND_OOS:
mod_rq_state(req, m, 0, RQ_NET_QUEUED);
req->w.cb = w_send_out_of_sync;
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+ drbd_queue_work(&connection->sender_work,
&req->w);
break;

@@ -714,7 +716,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,

get_ldev(device); /* always succeeds in this call path */
req->w.cb = w_restart_disk_io;
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+ drbd_queue_work(&connection->sender_work,
&req->w);
break;

@@ -736,7 +738,8 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,

mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING);
if (req->w.cb) {
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+ /* w.cb expected to be w_send_dblock, or w_send_read_req */
+ drbd_queue_work(&connection->sender_work,
&req->w);
rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
} /* else: FIXME can this happen? */
@@ -769,7 +772,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
break;

case QUEUE_AS_DRBD_BARRIER:
- start_new_tl_epoch(first_peer_device(device)->connection);
+ start_new_tl_epoch(connection);
mod_rq_state(req, m, 0, RQ_NET_OK|RQ_NET_DONE);
break;
};
diff --git a/drivers/block/drbd/drbd_state.c b/drivers/block/drbd/drbd_state.c
index a5d8aae..19da7c7 100644
--- a/drivers/block/drbd/drbd_state.c
+++ b/drivers/block/drbd/drbd_state.c
@@ -952,6 +952,8 @@ enum drbd_state_rv
__drbd_set_state(struct drbd_device *device, union drbd_state ns,
enum chg_state_flags flags, struct completion *done)
{
+ struct drbd_peer_device *peer_device = first_peer_device(device);
+ struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
union drbd_state os;
enum drbd_state_rv rv = SS_SUCCESS;
enum sanitize_state_warnings ssw;
@@ -978,9 +980,9 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
this happen...*/

if (is_valid_state(device, os) == rv)
- rv = is_valid_soft_transition(os, ns, first_peer_device(device)->connection);
+ rv = is_valid_soft_transition(os, ns, connection);
} else
- rv = is_valid_soft_transition(os, ns, first_peer_device(device)->connection);
+ rv = is_valid_soft_transition(os, ns, connection);
}

if (rv < SS_SUCCESS) {
@@ -997,7 +999,7 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
sanitize_state(). Only display it here if we where not called from
_conn_request_state() */
if (!(flags & CS_DC_SUSP))
- conn_pr_state_change(first_peer_device(device)->connection, os, ns,
+ conn_pr_state_change(connection, os, ns,
(flags & ~CS_DC_MASK) | CS_DC_SUSP);

/* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
@@ -1017,19 +1019,19 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,

/* put replicated vs not-replicated requests in seperate epochs */
if (did_remote != should_do_remote)
- start_new_tl_epoch(first_peer_device(device)->connection);
+ start_new_tl_epoch(connection);

if (os.disk == D_ATTACHING && ns.disk >= D_NEGOTIATING)
drbd_print_uuids(device, "attached to UUIDs");

/* Wake up role changes, that were delayed because of connection establishing */
if (os.conn == C_WF_REPORT_PARAMS && ns.conn != C_WF_REPORT_PARAMS &&
- no_peer_wf_report_params(first_peer_device(device)->connection))
- clear_bit(STATE_SENT, &first_peer_device(device)->connection->flags);
+ no_peer_wf_report_params(connection))
+ clear_bit(STATE_SENT, &connection->flags);

wake_up(&device->misc_wait);
wake_up(&device->state_wait);
- wake_up(&first_peer_device(device)->connection->ping_wait);
+ wake_up(&connection->ping_wait);

/* Aborted verify run, or we reached the stop sector.
* Log the last position, unless end-of-device. */
@@ -1118,21 +1120,21 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,

/* Receiver should clean up itself */
if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
- drbd_thread_stop_nowait(&first_peer_device(device)->connection->receiver);
+ drbd_thread_stop_nowait(&connection->receiver);

/* Now the receiver finished cleaning up itself, it should die */
if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
- drbd_thread_stop_nowait(&first_peer_device(device)->connection->receiver);
+ drbd_thread_stop_nowait(&connection->receiver);

/* Upon network failure, we need to restart the receiver. */
if (os.conn > C_WF_CONNECTION &&
ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
- drbd_thread_restart_nowait(&first_peer_device(device)->connection->receiver);
+ drbd_thread_restart_nowait(&connection->receiver);

/* Resume AL writing if we get a connection */
if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
drbd_resume_al(device);
- first_peer_device(device)->connection->connect_cnt++;
+ connection->connect_cnt++;
}

/* remember last attach time so request_timer_fn() won't
@@ -1150,7 +1152,7 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
ascw->w.cb = w_after_state_ch;
ascw->device = device;
ascw->done = done;
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+ drbd_queue_work(&connection->sender_work,
&ascw->w);
} else {
drbd_err(device, "Could not kmalloc an ascw\n");
@@ -1222,6 +1224,8 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
union drbd_state ns, enum chg_state_flags flags)
{
struct drbd_resource *resource = device->resource;
+ struct drbd_peer_device *peer_device = first_peer_device(device);
+ struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
struct sib_info sib;

sib.sib_reason = SIB_STATE_CHANGE;
@@ -1245,7 +1249,6 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
state change. This function might sleep */

if (ns.susp_nod) {
- struct drbd_connection *connection = first_peer_device(device)->connection;
enum drbd_req_event what = NOTHING;

spin_lock_irq(&device->resource->req_lock);
@@ -1267,8 +1270,6 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
}

if (ns.susp_fen) {
- struct drbd_connection *connection = first_peer_device(device)->connection;
-
spin_lock_irq(&device->resource->req_lock);
if (resource->susp_fen && conn_lowest_conn(connection) >= C_CONNECTED) {
/* case2: The connection was established again: */
@@ -1294,8 +1295,8 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
* which is unexpected. */
if ((os.conn != C_SYNC_SOURCE && os.conn != C_PAUSED_SYNC_S) &&
(ns.conn == C_SYNC_SOURCE || ns.conn == C_PAUSED_SYNC_S) &&
- first_peer_device(device)->connection->agreed_pro_version >= 96 && get_ldev(device)) {
- drbd_gen_and_send_sync_uuid(first_peer_device(device));
+ connection->agreed_pro_version >= 96 && get_ldev(device)) {
+ drbd_gen_and_send_sync_uuid(peer_device);
put_ldev(device);
}

@@ -1309,8 +1310,8 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
atomic_set(&device->rs_pending_cnt, 0);
drbd_rs_cancel_all(device);

- drbd_send_uuids(first_peer_device(device));
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_uuids(peer_device);
+ drbd_send_state(peer_device, ns);
}
/* No point in queuing send_bitmap if we don't have a connection
* anymore, so check also the _current_ state, not only the new state
@@ -1335,7 +1336,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
set_bit(NEW_CUR_UUID, &device->flags);
} else {
drbd_uuid_new_current(device);
- drbd_send_uuids(first_peer_device(device));
+ drbd_send_uuids(peer_device);
}
}
put_ldev(device);
@@ -1346,7 +1347,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
if (os.peer == R_SECONDARY && ns.peer == R_PRIMARY &&
device->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
drbd_uuid_new_current(device);
- drbd_send_uuids(first_peer_device(device));
+ drbd_send_uuids(peer_device);
}
/* D_DISKLESS Peer becomes secondary */
if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
@@ -1373,16 +1374,16 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
/* Last part of the attaching process ... */
if (ns.conn >= C_CONNECTED &&
os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
- drbd_send_sizes(first_peer_device(device), 0, 0); /* to start sync... */
- drbd_send_uuids(first_peer_device(device));
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_sizes(peer_device, 0, 0); /* to start sync... */
+ drbd_send_uuids(peer_device);
+ drbd_send_state(peer_device, ns);
}

/* We want to pause/continue resync, tell peer. */
if (ns.conn >= C_CONNECTED &&
((os.aftr_isp != ns.aftr_isp) ||
(os.user_isp != ns.user_isp)))
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);

/* In case one of the isp bits got set, suspend other devices. */
if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
@@ -1392,10 +1393,10 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
/* Make sure the peer gets informed about eventual state
changes (ISP bits) while we were in WFReportParams. */
if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);

if (os.conn != C_AHEAD && ns.conn == C_AHEAD)
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);

/* We are in the progress to start a full sync... */
if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
@@ -1449,7 +1450,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
drbd_disk_str(device->state.disk));

if (ns.conn >= C_CONNECTED)
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);

drbd_rs_cancel_all(device);

@@ -1473,7 +1474,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
drbd_disk_str(device->state.disk));

if (ns.conn >= C_CONNECTED)
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);
/* corresponding get_ldev in __drbd_set_state
* this may finally trigger drbd_ldev_destroy. */
put_ldev(device);
@@ -1481,7 +1482,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,

/* Notify peer that I had a local IO error, and did not detached.. */
if (os.disk == D_UP_TO_DATE && ns.disk == D_INCONSISTENT && ns.conn >= C_CONNECTED)
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);

/* Disks got bigger while they were detached */
if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
@@ -1499,14 +1500,14 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
/* sync target done with resync. Explicitly notify peer, even though
* it should (at least for non-empty resyncs) already know itself. */
if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);

/* Verify finished, or reached stop sector. Peer did not know about
* the stop sector, and we may even have changed the stop sector during
* verify to interrupt/stop early. Send the new state. */
if (os.conn == C_VERIFY_S && ns.conn == C_CONNECTED
&& verify_can_do_stop_sector(device))
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);

/* This triggers bitmap writeout of potentially still unwritten pages
* if the resync finished cleanly, or aborted because of peer disk
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index d8f57b6..595ab57 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -583,8 +583,10 @@ static int drbd_rs_number_requests(struct drbd_device *device)
return number;
}

-static int make_resync_request(struct drbd_device *device, int cancel)
+static int make_resync_request(struct drbd_device *const device, int cancel)
{
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
unsigned long bit;
sector_t sector;
const sector_t capacity = drbd_get_capacity(device->this_bdev);
@@ -618,15 +620,15 @@ static int make_resync_request(struct drbd_device *device, int cancel)

for (i = 0; i < number; i++) {
/* Stop generating RS requests, when half of the send buffer is filled */
- mutex_lock(&first_peer_device(device)->connection->data.mutex);
- if (first_peer_device(device)->connection->data.socket) {
- queued = first_peer_device(device)->connection->data.socket->sk->sk_wmem_queued;
- sndbuf = first_peer_device(device)->connection->data.socket->sk->sk_sndbuf;
+ mutex_lock(&connection->data.mutex);
+ if (connection->data.socket) {
+ queued = connection->data.socket->sk->sk_wmem_queued;
+ sndbuf = connection->data.socket->sk->sk_sndbuf;
} else {
queued = 1;
sndbuf = 0;
}
- mutex_unlock(&first_peer_device(device)->connection->data.mutex);
+ mutex_unlock(&connection->data.mutex);
if (queued > sndbuf / 2)
goto requeue;

@@ -696,9 +698,9 @@ next_sector:
/* adjust very last sectors, in case we are oddly sized */
if (sector + (size>>9) > capacity)
size = (capacity-sector)<<9;
- if (first_peer_device(device)->connection->agreed_pro_version >= 89 &&
- first_peer_device(device)->connection->csums_tfm) {
- switch (read_for_csum(first_peer_device(device), sector, size)) {
+ if (connection->agreed_pro_version >= 89 &&
+ connection->csums_tfm) {
+ switch (read_for_csum(peer_device, sector, size)) {
case -EIO: /* Disk failure */
put_ldev(device);
return -EIO;
@@ -717,7 +719,7 @@ next_sector:
int err;

inc_rs_pending(device);
- err = drbd_send_drequest(first_peer_device(device), P_RS_DATA_REQUEST,
+ err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
sector, size, ID_SYNCER);
if (err) {
drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
@@ -1351,7 +1353,8 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_device *device = req->device;
- struct drbd_connection *connection = first_peer_device(device)->connection;
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *const connection = peer_device->connection;
int err;

if (unlikely(cancel)) {
@@ -1365,7 +1368,7 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
* No more barriers will be sent, until we leave AHEAD mode again. */
maybe_send_barrier(connection, req->epoch);

- err = drbd_send_out_of_sync(first_peer_device(device), req);
+ err = drbd_send_out_of_sync(peer_device, req);
req_mod(req, OOS_HANDED_TO_NETWORK);

return err;
@@ -1380,7 +1383,8 @@ int w_send_dblock(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_device *device = req->device;
- struct drbd_connection *connection = first_peer_device(device)->connection;
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *connection = peer_device->connection;
int err;

if (unlikely(cancel)) {
@@ -1392,7 +1396,7 @@ int w_send_dblock(struct drbd_work *w, int cancel)
maybe_send_barrier(connection, req->epoch);
connection->send.current_epoch_writes++;

- err = drbd_send_dblock(first_peer_device(device), req);
+ err = drbd_send_dblock(peer_device, req);
req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);

return err;
@@ -1407,7 +1411,8 @@ int w_send_read_req(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_device *device = req->device;
- struct drbd_connection *connection = first_peer_device(device)->connection;
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *connection = peer_device->connection;
int err;

if (unlikely(cancel)) {
@@ -1419,7 +1424,7 @@ int w_send_read_req(struct drbd_work *w, int cancel)
* if there was any yet. */
maybe_send_barrier(connection, req->epoch);

- err = drbd_send_drequest(first_peer_device(device), P_DATA_REQUEST, req->i.sector, req->i.size,
+ err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
(unsigned long)req);

req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
@@ -1633,6 +1638,8 @@ int w_start_resync(struct drbd_work *w, int cancel)
*/
void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
{
+ struct drbd_peer_device *peer_device = first_peer_device(device);
+ struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
union drbd_state ns;
int r;

@@ -1651,7 +1658,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
if (r > 0) {
drbd_info(device, "before-resync-target handler returned %d, "
"dropping connection.\n", r);
- conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
+ conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
return;
}
} else /* C_SYNC_SOURCE */ {
@@ -1664,7 +1671,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
} else {
drbd_info(device, "before-resync-source handler returned %d, "
"dropping connection.\n", r);
- conn_request_state(first_peer_device(device)->connection,
+ conn_request_state(connection,
NS(conn, C_DISCONNECTING), CS_HARD);
return;
}
@@ -1672,7 +1679,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
}
}

- if (current == first_peer_device(device)->connection->worker.task) {
+ if (current == connection->worker.task) {
/* The worker should not sleep waiting for state_mutex,
that can take long */
if (!mutex_trylock(device->state_mutex)) {
@@ -1756,12 +1763,10 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
* drbd_resync_finished from here in that case.
* We drbd_gen_and_send_sync_uuid here for protocol < 96,
* and from after_state_ch otherwise. */
- if (side == C_SYNC_SOURCE &&
- first_peer_device(device)->connection->agreed_pro_version < 96)
- drbd_gen_and_send_sync_uuid(first_peer_device(device));
+ if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
+ drbd_gen_and_send_sync_uuid(peer_device);

- if (first_peer_device(device)->connection->agreed_pro_version < 95 &&
- device->rs_total == 0) {
+ if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
/* This still has a race (about when exactly the peers
* detect connection loss) that can lead to a full sync
* on next handshake. In 8.3.9 we fixed this with explicit
@@ -1777,7 +1782,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
int timeo;

rcu_read_lock();
- nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+ nc = rcu_dereference(connection->net_conf);
timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
rcu_read_unlock();
schedule_timeout_interruptible(timeo);
--
1.9.1

2014-06-30 16:01:36

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 08/18] drbd: fix a race stopping the worker thread

From: Lars Ellenberg <[email protected]>

We may implicitly call drbd_send() from inside wait_for_work(),
via maybe_send_barrier().

If the "stop" signal was send just before that, drbd_send() would call
flush_signals(), and we would run an unbounded schedule() afterwards.

Fix: check for thread_state == RUNNING before we schedule()

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_worker.c | 3 +++
1 file changed, 3 insertions(+)

diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 5915885..47bc840 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1930,6 +1930,9 @@ static void wait_for_work(struct drbd_connection *connection, struct list_head *
if (send_barrier)
maybe_send_barrier(connection,
connection->send.current_epoch_nr + 1);
+ /* drbd_send() may have called flush_signals() */
+ if (get_t_state(&connection->worker) != RUNNING)
+ break;
schedule();
/* may be woken up for other things but new work, too,
* e.g. if the current epoch got closed.
--
1.9.1

2014-06-30 16:01:34

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 06/18] drbd: allow write-ordering policy to be bumped up again

From: Lars Ellenberg <[email protected]>

Previously, once you disabled flushes as a means of enforcing
write-ordering, you'd need to detach/re-attach to enable them again.

Allow drbdsetup disk-options to re-enable previously disabled
write-ordering policy options at runtime.

While at it fix RCU in drbd_bump_write_ordering()
max_allowed_wo() uses rcu_dereference, therefore it must
be called within rcu_read_lock()/rcu_read_unlock()

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_nl.c | 10 +++++++++-
drivers/block/drbd/drbd_receiver.c | 6 ++++--
2 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 0bf8a60..66065e6 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -1294,6 +1294,13 @@ static unsigned int drbd_al_extents_max(struct drbd_backing_dev *bdev)
return (al_size_4k - 1) * AL_CONTEXT_PER_TRANSACTION;
}

+static bool write_ordering_changed(struct disk_conf *a, struct disk_conf *b)
+{
+ return a->disk_barrier != b->disk_barrier ||
+ a->disk_flushes != b->disk_flushes ||
+ a->disk_drain != b->disk_drain;
+}
+
int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info)
{
struct drbd_config_context adm_ctx;
@@ -1400,7 +1407,8 @@ int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info)
else
set_bit(MD_NO_FUA, &device->flags);

- drbd_bump_write_ordering(device->resource, NULL, WO_bdev_flush);
+ if (write_ordering_changed(old_disk_conf, new_disk_conf))
+ drbd_bump_write_ordering(device->resource, NULL, WO_bdev_flush);

drbd_md_sync(device);

diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index bb1434d..8d8a6b7 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -1290,7 +1290,8 @@ void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backin
};

pwo = resource->write_ordering;
- wo = min(pwo, wo);
+ if (wo != WO_bdev_flush)
+ wo = min(pwo, wo);
rcu_read_lock();
idr_for_each_entry(&resource->devices, device, vnr) {
if (get_ldev(device)) {
@@ -1300,11 +1301,12 @@ void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backin
put_ldev(device);
}
}
- rcu_read_unlock();

if (bdev)
wo = max_allowed_wo(bdev, wo);

+ rcu_read_unlock();
+
resource->write_ordering = wo;
if (pwo != resource->write_ordering || wo == WO_bdev_flush)
drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
--
1.9.1

2014-06-30 16:01:33

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 07/18] drbd: get rid of atomic update on disk bitmap works

From: Lars Ellenberg <[email protected]>

Just trigger the occasional lazy bitmap write-out during resync
from the central wait_for_work() helper.

Previously, during resync, bitmap pages would be written out separately,
synchronously, one at a time, at least 8 times each (every 512 bytes
worth of bitmap cleared).

Now we trigger "merge friendly" bulk write out of all cleared pages
every two seconds during resync, and once the resync is finished.
Most pages will be written out only once.

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_actlog.c | 63 +---------------------------------------
drivers/block/drbd/drbd_bitmap.c | 55 -----------------------------------
drivers/block/drbd/drbd_int.h | 2 +-
drivers/block/drbd/drbd_nl.c | 7 -----
drivers/block/drbd/drbd_worker.c | 54 ++++++++++++++++++++++++++++++++++
5 files changed, 56 insertions(+), 125 deletions(-)

diff --git a/drivers/block/drbd/drbd_actlog.c b/drivers/block/drbd/drbd_actlog.c
index 05a1780..9c42edf 100644
--- a/drivers/block/drbd/drbd_actlog.c
+++ b/drivers/block/drbd/drbd_actlog.c
@@ -92,12 +92,6 @@ struct __packed al_transaction_on_disk {
__be32 context[AL_CONTEXT_PER_TRANSACTION];
};

-struct update_odbm_work {
- struct drbd_work w;
- struct drbd_device *device;
- unsigned int enr;
-};
-
struct update_al_work {
struct drbd_work w;
struct drbd_device *device;
@@ -452,15 +446,6 @@ static unsigned int al_extent_to_bm_page(unsigned int al_enr)
(AL_EXTENT_SHIFT - BM_BLOCK_SHIFT));
}

-static unsigned int rs_extent_to_bm_page(unsigned int rs_enr)
-{
- return rs_enr >>
- /* bit to page */
- ((PAGE_SHIFT + 3) -
- /* resync extent number to bit */
- (BM_EXT_SHIFT - BM_BLOCK_SHIFT));
-}
-
static sector_t al_tr_number_to_on_disk_sector(struct drbd_device *device)
{
const unsigned int stripes = device->ldev->md.al_stripes;
@@ -682,40 +667,6 @@ int drbd_initialize_al(struct drbd_device *device, void *buffer)
return 0;
}

-static int w_update_odbm(struct drbd_work *w, int unused)
-{
- struct update_odbm_work *udw = container_of(w, struct update_odbm_work, w);
- struct drbd_device *device = udw->device;
- struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
-
- if (!get_ldev(device)) {
- if (__ratelimit(&drbd_ratelimit_state))
- drbd_warn(device, "Can not update on disk bitmap, local IO disabled.\n");
- kfree(udw);
- return 0;
- }
-
- drbd_bm_write_page(device, rs_extent_to_bm_page(udw->enr));
- put_ldev(device);
-
- kfree(udw);
-
- if (drbd_bm_total_weight(device) <= device->rs_failed) {
- switch (device->state.conn) {
- case C_SYNC_SOURCE: case C_SYNC_TARGET:
- case C_PAUSED_SYNC_S: case C_PAUSED_SYNC_T:
- drbd_resync_finished(device);
- default:
- /* nothing to do */
- break;
- }
- }
- drbd_bcast_event(device, &sib);
-
- return 0;
-}
-
-
/* ATTENTION. The AL's extents are 4MB each, while the extents in the
* resync LRU-cache are 16MB each.
* The caller of this function has to hold an get_ldev() reference.
@@ -726,8 +677,6 @@ static void drbd_try_clear_on_disk_bm(struct drbd_device *device, sector_t secto
int count, int success)
{
struct lc_element *e;
- struct update_odbm_work *udw;
-
unsigned int enr;

D_ASSERT(device, atomic_read(&device->local_cnt));
@@ -791,17 +740,7 @@ static void drbd_try_clear_on_disk_bm(struct drbd_device *device, sector_t secto

if (ext->rs_left == ext->rs_failed) {
ext->rs_failed = 0;
-
- udw = kmalloc(sizeof(*udw), GFP_ATOMIC);
- if (udw) {
- udw->enr = ext->lce.lc_number;
- udw->w.cb = w_update_odbm;
- udw->device = device;
- drbd_queue_work_front(&first_peer_device(device)->connection->sender_work,
- &udw->w);
- } else {
- drbd_warn(device, "Could not kmalloc an udw\n");
- }
+ wake_up(&first_peer_device(device)->connection->sender_work.q_wait);
}
} else {
drbd_err(device, "lc_get() failed! locked=%d/%d flags=%lu\n",
diff --git a/drivers/block/drbd/drbd_bitmap.c b/drivers/block/drbd/drbd_bitmap.c
index ed31041..3da6730 100644
--- a/drivers/block/drbd/drbd_bitmap.c
+++ b/drivers/block/drbd/drbd_bitmap.c
@@ -1227,61 +1227,6 @@ int drbd_bm_write_hinted(struct drbd_device *device) __must_hold(local)
return bm_rw(device, WRITE, BM_AIO_WRITE_HINTED | BM_AIO_COPY_PAGES, 0);
}

-/**
- * drbd_bm_write_page() - Writes a PAGE_SIZE aligned piece of bitmap
- * @device: DRBD device.
- * @idx: bitmap page index
- *
- * We don't want to special case on logical_block_size of the backend device,
- * so we submit PAGE_SIZE aligned pieces.
- * Note that on "most" systems, PAGE_SIZE is 4k.
- *
- * In case this becomes an issue on systems with larger PAGE_SIZE,
- * we may want to change this again to write 4k aligned 4k pieces.
- */
-int drbd_bm_write_page(struct drbd_device *device, unsigned int idx) __must_hold(local)
-{
- struct bm_aio_ctx *ctx;
- int err;
-
- if (bm_test_page_unchanged(device->bitmap->bm_pages[idx])) {
- dynamic_drbd_dbg(device, "skipped bm page write for idx %u\n", idx);
- return 0;
- }
-
- ctx = kmalloc(sizeof(struct bm_aio_ctx), GFP_NOIO);
- if (!ctx)
- return -ENOMEM;
-
- *ctx = (struct bm_aio_ctx) {
- .device = device,
- .in_flight = ATOMIC_INIT(1),
- .done = 0,
- .flags = BM_AIO_COPY_PAGES,
- .error = 0,
- .kref = { ATOMIC_INIT(2) },
- };
-
- if (!get_ldev(device)) { /* put is in bm_aio_ctx_destroy() */
- drbd_err(device, "ASSERT FAILED: get_ldev_if_state() == 1 in drbd_bm_write_page()\n");
- kfree(ctx);
- return -ENODEV;
- }
-
- bm_page_io_async(ctx, idx, WRITE_SYNC);
- wait_until_done_or_force_detached(device, device->ldev, &ctx->done);
-
- if (ctx->error)
- drbd_chk_io_error(device, 1, DRBD_META_IO_ERROR);
- /* that causes us to detach, so the in memory bitmap will be
- * gone in a moment as well. */
-
- device->bm_writ_cnt++;
- err = atomic_read(&ctx->in_flight) ? -EIO : ctx->error;
- kref_put(&ctx->kref, &bm_aio_ctx_destroy);
- return err;
-}
-
/* NOTE
* find_first_bit returns int, we return unsigned long.
* For this to work on 32bit arch with bitnumbers > (1<<32),
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index 82ece1b..eb002a7 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -1196,11 +1196,11 @@ extern void _drbd_bm_set_bits(struct drbd_device *device,
const unsigned long s, const unsigned long e);
extern int drbd_bm_test_bit(struct drbd_device *device, unsigned long bitnr);
extern int drbd_bm_e_weight(struct drbd_device *device, unsigned long enr);
-extern int drbd_bm_write_page(struct drbd_device *device, unsigned int idx) __must_hold(local);
extern int drbd_bm_read(struct drbd_device *device) __must_hold(local);
extern void drbd_bm_mark_for_writeout(struct drbd_device *device, int page_nr);
extern int drbd_bm_write(struct drbd_device *device) __must_hold(local);
extern int drbd_bm_write_hinted(struct drbd_device *device) __must_hold(local);
+extern int drbd_bm_write_lazy(struct drbd_device *device, unsigned upper_idx) __must_hold(local);
extern int drbd_bm_write_all(struct drbd_device *device) __must_hold(local);
extern int drbd_bm_write_copy_pages(struct drbd_device *device) __must_hold(local);
extern size_t drbd_bm_words(struct drbd_device *device);
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 66065e6..52221f6 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -3641,13 +3641,6 @@ void drbd_bcast_event(struct drbd_device *device, const struct sib_info *sib)
unsigned seq;
int err = -ENOMEM;

- if (sib->sib_reason == SIB_SYNC_PROGRESS) {
- if (time_after(jiffies, device->rs_last_bcast + HZ))
- device->rs_last_bcast = jiffies;
- else
- return;
- }
-
seq = atomic_inc_return(&drbd_genl_seq);
msg = genlmsg_new(NLMSG_GOODSIZE, GFP_NOIO);
if (!msg)
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 595ab57..5915885 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1804,6 +1804,58 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
mutex_unlock(device->state_mutex);
}

+static void update_on_disk_bitmap(struct drbd_device *device)
+{
+ struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
+ device->rs_last_bcast = jiffies;
+
+ if (!get_ldev(device))
+ return;
+
+ drbd_bm_write_lazy(device, 0);
+ if (drbd_bm_total_weight(device) <= device->rs_failed)
+ drbd_resync_finished(device);
+ drbd_bcast_event(device, &sib);
+ /* update timestamp, in case it took a while to write out stuff */
+ device->rs_last_bcast = jiffies;
+ put_ldev(device);
+}
+
+bool wants_lazy_bitmap_update(struct drbd_device *device)
+{
+ enum drbd_conns connection_state = device->state.conn;
+ return
+ /* only do a lazy writeout, if device is in some resync state */
+ (connection_state == C_SYNC_SOURCE
+ || connection_state == C_SYNC_TARGET
+ || connection_state == C_PAUSED_SYNC_S
+ || connection_state == C_PAUSED_SYNC_T) &&
+ /* AND
+ * either we just finished, or the last lazy update
+ * was some time ago already. */
+ (drbd_bm_total_weight(device) <= device->rs_failed
+ || time_after(jiffies, device->rs_last_bcast + 2*HZ));
+}
+
+static void try_update_all_on_disk_bitmaps(struct drbd_connection *connection)
+{
+ struct drbd_peer_device *peer_device;
+ int vnr;
+
+ rcu_read_lock();
+ idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
+ struct drbd_device *device = peer_device->device;
+ if (!wants_lazy_bitmap_update(device))
+ continue;
+ kref_get(&device->kref);
+ rcu_read_unlock();
+ update_on_disk_bitmap(device);
+ kref_put(&device->kref, drbd_destroy_device);
+ rcu_read_lock();
+ }
+ rcu_read_unlock();
+}
+
static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
{
spin_lock_irq(&queue->q_lock);
@@ -1882,6 +1934,8 @@ static void wait_for_work(struct drbd_connection *connection, struct list_head *
/* may be woken up for other things but new work, too,
* e.g. if the current epoch got closed.
* In which case we send the barrier above. */
+
+ try_update_all_on_disk_bitmaps(connection);
}
finish_wait(&connection->sender_work.q_wait, &wait);

--
1.9.1

2014-06-30 16:01:32

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 10/18] drbd: stop the meta data sync timer before open coded meta data sync

From: Lars Ellenberg <[email protected]>

If we re-write all meta data due to resize, we have open-coded write-out
of our meta data super block. Stop the md_sync_timer, it would just
trigger scary but in this case spurious "timer expired" messages.

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_nl.c | 4 ++++
1 file changed, 4 insertions(+)

diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 52221f6..133e9b3 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -967,6 +967,10 @@ drbd_determine_dev_size(struct drbd_device *device, enum dds_flags flags, struct
if (la_size_changed || md_moved || rs) {
u32 prev_flags;

+ /* We do some synchronous IO below, which may take some time.
+ * Clear the timer, to avoid scary "timer expired!" messages,
+ * "Superblock" is written out at least twice below, anyways. */
+ del_timer(&device->md_sync_timer);
drbd_al_shrink(device); /* All extents inactive. */

prev_flags = md->flags;
--
1.9.1

2014-06-30 16:02:50

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 02/18] drbd: device->ldev is not guaranteed on an D_ATTACHING disk

Some parts of the code assumed that get_ldev_if_state(device, D_ATTACHING)
is sufficient to access the ldev member of the device object. That was
wrong. ldev may not be there or might be freed at any time if the device
has a disk state of D_ATTACHING.

bm_rw()
Documented that drbd_bm_read() is only called from drbd_adm_attach.
drbd_bm_write() is only called when a reference is held, and it is
documented that a caller has to hold a reference before calling
drbd_bm_write()

drbd_bm_write_page()
Use get_ldev() instead of get_ldev_if_state(device, D_ATTACHING)

drbd_bmio_set_n_write()
No longer use get_ldev_if_state(device, D_ATTACHING). All callers
hold a reference to ldev now.

drbd_bmio_clear_n_write()
All callers where holding a reference of ldev anyways. Remove the
misleading get_ldev_if_state(device, D_ATTACHING)

drbd_reconsider_max_bio_size()
Removed the get_ldev_if_state(device, D_ATTACHING). All callers
now pass a struct drbd_backing_dev* when they have a proper
reference, or a NULL pointer.
Before this fix, the receiver could trigger a NULL pointer
deref when in drbd_reconsider_max_bio_size()

drbd_bump_write_ordering()
Used get_ldev_if_state(device, D_ATTACHING) with the wrong assumption.
Remove it, and allow the caller to pass in a struct drbd_backing_dev*
when the caller knows that accessing this bdev is safe.

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_bitmap.c | 4 +++-
drivers/block/drbd/drbd_int.h | 9 ++++----
drivers/block/drbd/drbd_main.c | 36 +++++++++++++------------------
drivers/block/drbd/drbd_nl.c | 41 +++++++++++++++++++++++-------------
drivers/block/drbd/drbd_receiver.c | 43 ++++++++++++++++++++++++++------------
include/linux/drbd.h | 2 +-
6 files changed, 79 insertions(+), 56 deletions(-)

diff --git a/drivers/block/drbd/drbd_bitmap.c b/drivers/block/drbd/drbd_bitmap.c
index 1aa29f8..ed31041 100644
--- a/drivers/block/drbd/drbd_bitmap.c
+++ b/drivers/block/drbd/drbd_bitmap.c
@@ -1085,6 +1085,8 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
kfree(ctx);
return -ENODEV;
}
+ /* Here D_ATTACHING is sufficient since drbd_bm_read() is called only from
+ drbd_adm_attach(), after device->ldev was assigned. */

if (!ctx->flags)
WARN_ON(!(BM_LOCKED_MASK & b->bm_flags));
@@ -1260,7 +1262,7 @@ int drbd_bm_write_page(struct drbd_device *device, unsigned int idx) __must_hold
.kref = { ATOMIC_INIT(2) },
};

- if (!get_ldev_if_state(device, D_ATTACHING)) { /* put is in bm_aio_ctx_destroy() */
+ if (!get_ldev(device)) { /* put is in bm_aio_ctx_destroy() */
drbd_err(device, "ASSERT FAILED: get_ldev_if_state() == 1 in drbd_bm_write_page()\n");
kfree(ctx);
return -ENODEV;
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index 1ef2474..c87bc8e 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -984,8 +984,8 @@ extern int drbd_bitmap_io(struct drbd_device *device,
extern int drbd_bitmap_io_from_worker(struct drbd_device *device,
int (*io_fn)(struct drbd_device *),
char *why, enum bm_flag flags);
-extern int drbd_bmio_set_n_write(struct drbd_device *device);
-extern int drbd_bmio_clear_n_write(struct drbd_device *device);
+extern int drbd_bmio_set_n_write(struct drbd_device *device) __must_hold(local);
+extern int drbd_bmio_clear_n_write(struct drbd_device *device) __must_hold(local);
extern void drbd_ldev_destroy(struct drbd_device *device);

/* Meta data layout
@@ -1313,7 +1313,7 @@ enum determine_dev_size {
extern enum determine_dev_size
drbd_determine_dev_size(struct drbd_device *, enum dds_flags, struct resize_parms *) __must_hold(local);
extern void resync_after_online_grow(struct drbd_device *);
-extern void drbd_reconsider_max_bio_size(struct drbd_device *device);
+extern void drbd_reconsider_max_bio_size(struct drbd_device *device, struct drbd_backing_dev *bdev);
extern enum drbd_state_rv drbd_set_role(struct drbd_device *device,
enum drbd_role new_role,
int force);
@@ -1479,7 +1479,8 @@ static inline void drbd_generic_make_request(struct drbd_device *device,
generic_make_request(bio);
}

-void drbd_bump_write_ordering(struct drbd_resource *resource, enum write_ordering_e wo);
+void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
+ enum write_ordering_e wo);

/* drbd_proc.c */
extern struct proc_dir_entry *drbd_proc;
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 17b9a23..a6af935 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -3466,23 +3466,19 @@ void drbd_uuid_set_bm(struct drbd_device *device, u64 val) __must_hold(local)
*
* Sets all bits in the bitmap and writes the whole bitmap to stable storage.
*/
-int drbd_bmio_set_n_write(struct drbd_device *device)
+int drbd_bmio_set_n_write(struct drbd_device *device) __must_hold(local)
{
int rv = -EIO;

- if (get_ldev_if_state(device, D_ATTACHING)) {
- drbd_md_set_flag(device, MDF_FULL_SYNC);
- drbd_md_sync(device);
- drbd_bm_set_all(device);
-
- rv = drbd_bm_write(device);
+ drbd_md_set_flag(device, MDF_FULL_SYNC);
+ drbd_md_sync(device);
+ drbd_bm_set_all(device);

- if (!rv) {
- drbd_md_clear_flag(device, MDF_FULL_SYNC);
- drbd_md_sync(device);
- }
+ rv = drbd_bm_write(device);

- put_ldev(device);
+ if (!rv) {
+ drbd_md_clear_flag(device, MDF_FULL_SYNC);
+ drbd_md_sync(device);
}

return rv;
@@ -3494,18 +3490,11 @@ int drbd_bmio_set_n_write(struct drbd_device *device)
*
* Clears all bits in the bitmap and writes the whole bitmap to stable storage.
*/
-int drbd_bmio_clear_n_write(struct drbd_device *device)
+int drbd_bmio_clear_n_write(struct drbd_device *device) __must_hold(local)
{
- int rv = -EIO;
-
drbd_resume_al(device);
- if (get_ldev_if_state(device, D_ATTACHING)) {
- drbd_bm_clear_all(device);
- rv = drbd_bm_write(device);
- put_ldev(device);
- }
-
- return rv;
+ drbd_bm_clear_all(device);
+ return drbd_bm_write(device);
}

static int w_bitmap_io(struct drbd_work *w, int unused)
@@ -3603,6 +3592,9 @@ static int w_go_diskless(struct drbd_work *w, int unused)
* that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
* called from worker context. It MUST NOT be used while a previous such
* work is still pending!
+ *
+ * Its worker function encloses the call of io_fn() by get_ldev() and
+ * put_ldev().
*/
void drbd_queue_bitmap_io(struct drbd_device *device,
int (*io_fn)(struct drbd_device *),
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 43fad2c..25f4b6f 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -1110,15 +1110,16 @@ static int drbd_check_al_size(struct drbd_device *device, struct disk_conf *dc)
return 0;
}

-static void drbd_setup_queue_param(struct drbd_device *device, unsigned int max_bio_size)
+static void drbd_setup_queue_param(struct drbd_device *device, struct drbd_backing_dev *bdev,
+ unsigned int max_bio_size)
{
struct request_queue * const q = device->rq_queue;
unsigned int max_hw_sectors = max_bio_size >> 9;
unsigned int max_segments = 0;
struct request_queue *b = NULL;

- if (get_ldev_if_state(device, D_ATTACHING)) {
- b = device->ldev->backing_bdev->bd_disk->queue;
+ if (bdev) {
+ b = bdev->backing_bdev->bd_disk->queue;

max_hw_sectors = min(queue_max_hw_sectors(b), max_bio_size >> 9);
rcu_read_lock();
@@ -1163,11 +1164,10 @@ static void drbd_setup_queue_param(struct drbd_device *device, unsigned int max_
b->backing_dev_info.ra_pages);
q->backing_dev_info.ra_pages = b->backing_dev_info.ra_pages;
}
- put_ldev(device);
}
}

-void drbd_reconsider_max_bio_size(struct drbd_device *device)
+void drbd_reconsider_max_bio_size(struct drbd_device *device, struct drbd_backing_dev *bdev)
{
unsigned int now, new, local, peer;

@@ -1175,10 +1175,9 @@ void drbd_reconsider_max_bio_size(struct drbd_device *device)
local = device->local_max_bio_size; /* Eventually last known value, from volatile memory */
peer = device->peer_max_bio_size; /* Eventually last known value, from meta data */

- if (get_ldev_if_state(device, D_ATTACHING)) {
- local = queue_max_hw_sectors(device->ldev->backing_bdev->bd_disk->queue) << 9;
+ if (bdev) {
+ local = queue_max_hw_sectors(bdev->backing_bdev->bd_disk->queue) << 9;
device->local_max_bio_size = local;
- put_ldev(device);
}
local = min(local, DRBD_MAX_BIO_SIZE);

@@ -1211,7 +1210,7 @@ void drbd_reconsider_max_bio_size(struct drbd_device *device)
if (new != now)
drbd_info(device, "max BIO size = %u\n", new);

- drbd_setup_queue_param(device, new);
+ drbd_setup_queue_param(device, bdev, new);
}

/* Starts the worker thread */
@@ -1399,7 +1398,7 @@ int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info)
else
set_bit(MD_NO_FUA, &device->flags);

- drbd_bump_write_ordering(device->resource, WO_bdev_flush);
+ drbd_bump_write_ordering(device->resource, NULL, WO_bdev_flush);

drbd_md_sync(device);

@@ -1704,7 +1703,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
new_disk_conf = NULL;
new_plan = NULL;

- drbd_bump_write_ordering(device->resource, WO_bdev_flush);
+ drbd_bump_write_ordering(device->resource, device->ldev, WO_bdev_flush);

if (drbd_md_test_flag(device->ldev, MDF_CRASHED_PRIMARY))
set_bit(CRASHED_PRIMARY, &device->flags);
@@ -1720,7 +1719,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
device->read_cnt = 0;
device->writ_cnt = 0;

- drbd_reconsider_max_bio_size(device);
+ drbd_reconsider_max_bio_size(device, device->ldev);

/* If I am currently not R_PRIMARY,
* but meta data primary indicator is set,
@@ -2648,8 +2647,13 @@ int drbd_adm_invalidate(struct sk_buff *skb, struct genl_info *info)
if (retcode != NO_ERROR)
goto out;

- mutex_lock(&adm_ctx.resource->adm_mutex);
device = adm_ctx.device;
+ if (!get_ldev(device)) {
+ retcode = ERR_NO_DISK;
+ goto out;
+ }
+
+ mutex_lock(&adm_ctx.resource->adm_mutex);

/* If there is still bitmap IO pending, probably because of a previous
* resync just being finished, wait for it before requesting a new resync.
@@ -2673,6 +2677,7 @@ int drbd_adm_invalidate(struct sk_buff *skb, struct genl_info *info)
retcode = drbd_request_state(device, NS(conn, C_STARTING_SYNC_T));
drbd_resume_io(device);
mutex_unlock(&adm_ctx.resource->adm_mutex);
+ put_ldev(device);
out:
drbd_adm_finish(&adm_ctx, info, retcode);
return 0;
@@ -2698,7 +2703,7 @@ out:
return 0;
}

-static int drbd_bmio_set_susp_al(struct drbd_device *device)
+static int drbd_bmio_set_susp_al(struct drbd_device *device) __must_hold(local)
{
int rv;

@@ -2719,8 +2724,13 @@ int drbd_adm_invalidate_peer(struct sk_buff *skb, struct genl_info *info)
if (retcode != NO_ERROR)
goto out;

- mutex_lock(&adm_ctx.resource->adm_mutex);
device = adm_ctx.device;
+ if (!get_ldev(device)) {
+ retcode = ERR_NO_DISK;
+ goto out;
+ }
+
+ mutex_lock(&adm_ctx.resource->adm_mutex);

/* If there is still bitmap IO pending, probably because of a previous
* resync just being finished, wait for it before requesting a new resync.
@@ -2747,6 +2757,7 @@ int drbd_adm_invalidate_peer(struct sk_buff *skb, struct genl_info *info)
retcode = drbd_request_state(device, NS(conn, C_STARTING_SYNC_S));
drbd_resume_io(device);
mutex_unlock(&adm_ctx.resource->adm_mutex);
+ put_ldev(device);
out:
drbd_adm_finish(&adm_ctx, info, retcode);
return 0;
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index c708418..be0c376 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -1168,7 +1168,7 @@ static void drbd_flush(struct drbd_connection *connection)
/* would rather check on EOPNOTSUPP, but that is not reliable.
* don't try again for ANY return value != 0
* if (rv == -EOPNOTSUPP) */
- drbd_bump_write_ordering(connection->resource, WO_drain_io);
+ drbd_bump_write_ordering(connection->resource, NULL, WO_drain_io);
}
put_ldev(device);
kref_put(&device->kref, drbd_destroy_device);
@@ -1257,14 +1257,29 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connectio
return rv;
}

+static enum write_ordering_e
+max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
+{
+ struct disk_conf *dc;
+
+ dc = rcu_dereference(bdev->disk_conf);
+
+ if (wo == WO_bdev_flush && !dc->disk_flushes)
+ wo = WO_drain_io;
+ if (wo == WO_drain_io && !dc->disk_drain)
+ wo = WO_none;
+
+ return wo;
+}
+
/**
* drbd_bump_write_ordering() - Fall back to an other write ordering method
* @connection: DRBD connection.
* @wo: Write ordering method to try.
*/
-void drbd_bump_write_ordering(struct drbd_resource *resource, enum write_ordering_e wo)
+void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
+ enum write_ordering_e wo)
{
- struct disk_conf *dc;
struct drbd_device *device;
enum write_ordering_e pwo;
int vnr;
@@ -1278,17 +1293,18 @@ void drbd_bump_write_ordering(struct drbd_resource *resource, enum write_orderin
wo = min(pwo, wo);
rcu_read_lock();
idr_for_each_entry(&resource->devices, device, vnr) {
- if (!get_ldev_if_state(device, D_ATTACHING))
- continue;
- dc = rcu_dereference(device->ldev->disk_conf);
-
- if (wo == WO_bdev_flush && !dc->disk_flushes)
- wo = WO_drain_io;
- if (wo == WO_drain_io && !dc->disk_drain)
- wo = WO_none;
- put_ldev(device);
+ if (get_ldev(device)) {
+ wo = max_allowed_wo(device->ldev, wo);
+ if (device->ldev == bdev)
+ bdev = NULL;
+ put_ldev(device);
+ }
}
rcu_read_unlock();
+
+ if (bdev)
+ wo = max_allowed_wo(bdev, wo);
+
resource->write_ordering = wo;
if (pwo != resource->write_ordering || wo == WO_bdev_flush)
drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
@@ -3709,7 +3725,6 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info
}

device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
- drbd_reconsider_max_bio_size(device);
/* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
In case we cleared the QUEUE_FLAG_DISCARD from our queue in
drbd_reconsider_max_bio_size(), we can be sure that after
@@ -3717,6 +3732,7 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info

ddsf = be16_to_cpu(p->dds_flags);
if (get_ldev(device)) {
+ drbd_reconsider_max_bio_size(device, device->ldev);
dd = drbd_determine_dev_size(device, ddsf, NULL);
put_ldev(device);
if (dd == DS_ERROR)
@@ -3724,6 +3740,7 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info
drbd_md_sync(device);
} else {
/* I am diskless, need to accept the peer's size. */
+ drbd_reconsider_max_bio_size(device, NULL);
drbd_set_my_capacity(device, p_size);
}

diff --git a/include/linux/drbd.h b/include/linux/drbd.h
index 3dbe9bd..20ec890 100644
--- a/include/linux/drbd.h
+++ b/include/linux/drbd.h
@@ -245,7 +245,7 @@ enum drbd_disk_state {
D_DISKLESS,
D_ATTACHING, /* In the process of reading the meta-data */
D_FAILED, /* Becomes D_DISKLESS as soon as we told it the peer */
- /* when >= D_FAILED it is legal to access mdev->bc */
+ /* when >= D_FAILED it is legal to access mdev->ldev */
D_NEGOTIATING, /* Late attaching state, we need to talk to the peer */
D_INCONSISTENT,
D_OUTDATED,
--
1.9.1

2014-06-30 16:03:08

by Philipp Reisner

[permalink] [raw]
Subject: [PATCH 01/18] drbd: Move write_ordering from connection to resource

Signed-off-by: Philipp Reisner <[email protected]>
Signed-off-by: Lars Ellenberg <[email protected]>
---
drivers/block/drbd/drbd_int.h | 5 +++--
drivers/block/drbd/drbd_main.c | 2 +-
drivers/block/drbd/drbd_nl.c | 4 ++--
drivers/block/drbd/drbd_proc.c | 2 +-
drivers/block/drbd/drbd_receiver.c | 25 ++++++++++++-------------
5 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index a76ceb3..1ef2474 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -594,6 +594,8 @@ struct drbd_resource {
unsigned susp_nod:1; /* IO suspended because no data */
unsigned susp_fen:1; /* IO suspended because fence peer handler runs */

+ enum write_ordering_e write_ordering;
+
cpumask_var_t cpu_mask;
};

@@ -636,7 +638,6 @@ struct drbd_connection {
struct drbd_epoch *current_epoch;
spinlock_t epoch_lock;
unsigned int epochs;
- enum write_ordering_e write_ordering;
atomic_t current_tle_nr; /* transfer log epoch number */
unsigned current_tle_writes; /* writes seen within this tl epoch */

@@ -1478,7 +1479,7 @@ static inline void drbd_generic_make_request(struct drbd_device *device,
generic_make_request(bio);
}

-void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo);
+void drbd_bump_write_ordering(struct drbd_resource *resource, enum write_ordering_e wo);

/* drbd_proc.c */
extern struct proc_dir_entry *drbd_proc;
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 960645c..17b9a23 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -2579,6 +2579,7 @@ struct drbd_resource *drbd_create_resource(const char *name)
kref_init(&resource->kref);
idr_init(&resource->devices);
INIT_LIST_HEAD(&resource->connections);
+ resource->write_ordering = WO_bdev_flush;
list_add_tail_rcu(&resource->resources, &drbd_resources);
mutex_init(&resource->conf_update);
mutex_init(&resource->adm_mutex);
@@ -2617,7 +2618,6 @@ struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts)
INIT_LIST_HEAD(&connection->current_epoch->list);
connection->epochs = 1;
spin_lock_init(&connection->epoch_lock);
- connection->write_ordering = WO_bdev_flush;

connection->send.seen_any_write_yet = false;
connection->send.current_epoch_nr = 0;
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 1b35c45..43fad2c 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -1399,7 +1399,7 @@ int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info)
else
set_bit(MD_NO_FUA, &device->flags);

- drbd_bump_write_ordering(first_peer_device(device)->connection, WO_bdev_flush);
+ drbd_bump_write_ordering(device->resource, WO_bdev_flush);

drbd_md_sync(device);

@@ -1704,7 +1704,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
new_disk_conf = NULL;
new_plan = NULL;

- drbd_bump_write_ordering(first_peer_device(device)->connection, WO_bdev_flush);
+ drbd_bump_write_ordering(device->resource, WO_bdev_flush);

if (drbd_md_test_flag(device->ldev, MDF_CRASHED_PRIMARY))
set_bit(CRASHED_PRIMARY, &device->flags);
diff --git a/drivers/block/drbd/drbd_proc.c b/drivers/block/drbd/drbd_proc.c
index 89736bd..f11e573 100644
--- a/drivers/block/drbd/drbd_proc.c
+++ b/drivers/block/drbd/drbd_proc.c
@@ -281,7 +281,7 @@ static int drbd_seq_show(struct seq_file *seq, void *v)
atomic_read(&device->unacked_cnt),
atomic_read(&device->ap_bio_cnt),
first_peer_device(device)->connection->epochs,
- write_ordering_chars[first_peer_device(device)->connection->write_ordering]
+ write_ordering_chars[device->resource->write_ordering]
);
seq_printf(seq, " oos:%llu\n",
Bit2KB((unsigned long long)
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index 5b17ec8..c708418 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -1151,7 +1151,7 @@ static void drbd_flush(struct drbd_connection *connection)
struct drbd_peer_device *peer_device;
int vnr;

- if (connection->write_ordering >= WO_bdev_flush) {
+ if (connection->resource->write_ordering >= WO_bdev_flush) {
rcu_read_lock();
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
struct drbd_device *device = peer_device->device;
@@ -1168,7 +1168,7 @@ static void drbd_flush(struct drbd_connection *connection)
/* would rather check on EOPNOTSUPP, but that is not reliable.
* don't try again for ANY return value != 0
* if (rv == -EOPNOTSUPP) */
- drbd_bump_write_ordering(connection, WO_drain_io);
+ drbd_bump_write_ordering(connection->resource, WO_drain_io);
}
put_ldev(device);
kref_put(&device->kref, drbd_destroy_device);
@@ -1262,10 +1262,10 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connectio
* @connection: DRBD connection.
* @wo: Write ordering method to try.
*/
-void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
+void drbd_bump_write_ordering(struct drbd_resource *resource, enum write_ordering_e wo)
{
struct disk_conf *dc;
- struct drbd_peer_device *peer_device;
+ struct drbd_device *device;
enum write_ordering_e pwo;
int vnr;
static char *write_ordering_str[] = {
@@ -1274,12 +1274,10 @@ void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ord
[WO_bdev_flush] = "flush",
};

- pwo = connection->write_ordering;
+ pwo = resource->write_ordering;
wo = min(pwo, wo);
rcu_read_lock();
- idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
- struct drbd_device *device = peer_device->device;
-
+ idr_for_each_entry(&resource->devices, device, vnr) {
if (!get_ldev_if_state(device, D_ATTACHING))
continue;
dc = rcu_dereference(device->ldev->disk_conf);
@@ -1291,9 +1289,9 @@ void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ord
put_ldev(device);
}
rcu_read_unlock();
- connection->write_ordering = wo;
- if (pwo != connection->write_ordering || wo == WO_bdev_flush)
- drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
+ resource->write_ordering = wo;
+ if (pwo != resource->write_ordering || wo == WO_bdev_flush)
+ drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
}

/**
@@ -1471,7 +1469,7 @@ static int receive_Barrier(struct drbd_connection *connection, struct packet_inf
* R_PRIMARY crashes now.
* Therefore we must send the barrier_ack after the barrier request was
* completed. */
- switch (connection->write_ordering) {
+ switch (connection->resource->write_ordering) {
case WO_none:
if (rv == FE_RECYCLED)
return 0;
@@ -1498,7 +1496,8 @@ static int receive_Barrier(struct drbd_connection *connection, struct packet_inf

return 0;
default:
- drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
+ drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
+ connection->resource->write_ordering);
return -EIO;
}

--
1.9.1