2006-08-28 16:08:41

by Dipankar Sarma

[permalink] [raw]
Subject: [PATCH 0/4] RCU: various merge candidates

This patchset consists of various merge candidates that would
do well to have some testing in -mm. This patchset breaks
out RCU implementation from its APIs to allow multiple
implementations, gives RCU its own softirq and finally
lines up preemptible RCU from -rt tree as a configurable
RCU implementation for mainline.

All comments and testing is welcome. RFC at the moment, but
I can later submit patches against -mm, Andrew, if you want.
They have been tested lightly using dbench, kernbench and ltp
(both CONFIG_CLASSIC_RCU=y and n) on x86 and ppc64.

Thanks
Dipankar


2006-08-28 16:10:13

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [PATCH 1/4] RCU: split classic rcu


This patch re-organizes the RCU code to enable multiple implementations
of RCU. Users of RCU continues to include rcupdate.h and the
RCU interfaces remain the same. This is in preparation for
subsequently merging the preepmtpible RCU implementation.

Signed-off-by: Dipankar Sarma <[email protected]>
---




include/linux/rcuclassic.h | 149 +++++++++++
include/linux/rcupdate.h | 153 +++---------
kernel/Makefile | 2
kernel/rcuclassic.c | 558 ++++++++++++++++++++++++++++++++++++++++++++
kernel/rcupdate.c | 559 ++-------------------------------------------
5 files changed, 781 insertions(+), 640 deletions(-)

diff -puN /dev/null include/linux/rcuclassic.h
--- /dev/null 2006-08-26 20:47:46.475534750 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/include/linux/rcuclassic.h 2006-08-27 00:52:40.000000000 +0530
@@ -0,0 +1,149 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion (classic version)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright (C) IBM Corporation, 2001
+ *
+ * Author: Dipankar Sarma <[email protected]>
+ *
+ * Based on the original work by Paul McKenney <[email protected]>
+ * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
+ * Papers:
+ * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
+ * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * http://lse.sourceforge.net/locking/rcupdate.html
+ *
+ */
+
+#ifndef __LINUX_RCUCLASSIC_H
+#define __LINUX_RCUCLASSIC_H
+
+#ifdef __KERNEL__
+
+#include <linux/cache.h>
+#include <linux/spinlock.h>
+#include <linux/threads.h>
+#include <linux/percpu.h>
+#include <linux/cpumask.h>
+#include <linux/seqlock.h>
+
+
+/* Global control variables for rcupdate callback mechanism. */
+struct rcu_ctrlblk {
+ long cur; /* Current batch number. */
+ long completed; /* Number of the last completed batch */
+ int next_pending; /* Is the next batch already waiting? */
+
+ spinlock_t lock ____cacheline_internodealigned_in_smp;
+ cpumask_t cpumask; /* CPUs that need to switch in order */
+ /* for current batch to proceed. */
+} ____cacheline_internodealigned_in_smp;
+
+/* Is batch a before batch b ? */
+static inline int rcu_batch_before(long a, long b)
+{
+ return (a - b) < 0;
+}
+
+/* Is batch a after batch b ? */
+static inline int rcu_batch_after(long a, long b)
+{
+ return (a - b) > 0;
+}
+
+/*
+ * Per-CPU data for Read-Copy UPdate.
+ * nxtlist - new callbacks are added here
+ * curlist - current batch for which quiescent cycle started if any
+ */
+struct rcu_data {
+ /* 1) quiescent state handling : */
+ long quiescbatch; /* Batch # for grace period */
+ int passed_quiesc; /* User-mode/idle loop etc. */
+ int qs_pending; /* core waits for quiesc state */
+
+ /* 2) batch handling */
+ long batch; /* Batch # for current RCU batch */
+ struct rcu_head *nxtlist;
+ struct rcu_head **nxttail;
+ long qlen; /* # of queued callbacks */
+ struct rcu_head *curlist;
+ struct rcu_head **curtail;
+ struct rcu_head *donelist;
+ struct rcu_head **donetail;
+ long blimit; /* Upper limit on a processed batch */
+ int cpu;
+#ifdef CONFIG_SMP
+ long last_rs_qlen; /* qlen during the last resched */
+#endif
+};
+
+DECLARE_PER_CPU(struct rcu_data, rcu_data);
+DECLARE_PER_CPU(struct rcu_data, rcu_bh_data);
+
+/*
+ * Increment the quiescent state counter.
+ * The counter is a bit degenerated: We do not need to know
+ * how many quiescent states passed, just if there was at least
+ * one since the start of the grace period. Thus just a flag.
+ */
+static inline void rcu_qsctr_inc(int cpu)
+{
+ struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
+ rdp->passed_quiesc = 1;
+}
+static inline void rcu_bh_qsctr_inc(int cpu)
+{
+ struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
+ rdp->passed_quiesc = 1;
+}
+
+extern int rcu_pending(int cpu);
+extern int rcu_needs_cpu(int cpu);
+
+#define __rcu_read_lock() \
+ do { \
+ preempt_disable(); \
+ __acquire(RCU); \
+ } while(0)
+#define __rcu_read_unlock() \
+ do { \
+ __release(RCU); \
+ preempt_enable(); \
+ } while(0)
+
+#define __rcu_read_lock_bh() \
+ do { \
+ local_bh_disable(); \
+ __acquire(RCU_BH); \
+ } while(0)
+#define __rcu_read_unlock_bh() \
+ do { \
+ __release(RCU_BH); \
+ local_bh_enable(); \
+ } while(0)
+
+#define __synchronize_sched() synchronize_rcu()
+
+extern void __rcu_init(void);
+extern void rcu_check_callbacks(int cpu, int user);
+extern void rcu_restart_cpu(int cpu);
+extern long rcu_batches_completed(void);
+
+#endif /* __KERNEL__ */
+#endif /* __LINUX_RCUCLASSIC_H */
diff -puN include/linux/rcupdate.h~rcu-split-classic include/linux/rcupdate.h
--- linux-2.6.18-rc3-rcu/include/linux/rcupdate.h~rcu-split-classic 2006-08-06 03:07:10.000000000 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/include/linux/rcupdate.h 2006-08-27 00:48:50.000000000 +0530
@@ -41,6 +41,7 @@
#include <linux/percpu.h>
#include <linux/cpumask.h>
#include <linux/seqlock.h>
+#include <linux/rcuclassic.h>

/**
* struct rcu_head - callback structure for use with RCU
@@ -59,81 +60,6 @@ struct rcu_head {
} while (0)


-
-/* Global control variables for rcupdate callback mechanism. */
-struct rcu_ctrlblk {
- long cur; /* Current batch number. */
- long completed; /* Number of the last completed batch */
- int next_pending; /* Is the next batch already waiting? */
-
- spinlock_t lock ____cacheline_internodealigned_in_smp;
- cpumask_t cpumask; /* CPUs that need to switch in order */
- /* for current batch to proceed. */
-} ____cacheline_internodealigned_in_smp;
-
-/* Is batch a before batch b ? */
-static inline int rcu_batch_before(long a, long b)
-{
- return (a - b) < 0;
-}
-
-/* Is batch a after batch b ? */
-static inline int rcu_batch_after(long a, long b)
-{
- return (a - b) > 0;
-}
-
-/*
- * Per-CPU data for Read-Copy UPdate.
- * nxtlist - new callbacks are added here
- * curlist - current batch for which quiescent cycle started if any
- */
-struct rcu_data {
- /* 1) quiescent state handling : */
- long quiescbatch; /* Batch # for grace period */
- int passed_quiesc; /* User-mode/idle loop etc. */
- int qs_pending; /* core waits for quiesc state */
-
- /* 2) batch handling */
- long batch; /* Batch # for current RCU batch */
- struct rcu_head *nxtlist;
- struct rcu_head **nxttail;
- long qlen; /* # of queued callbacks */
- struct rcu_head *curlist;
- struct rcu_head **curtail;
- struct rcu_head *donelist;
- struct rcu_head **donetail;
- long blimit; /* Upper limit on a processed batch */
- int cpu;
- struct rcu_head barrier;
-#ifdef CONFIG_SMP
- long last_rs_qlen; /* qlen during the last resched */
-#endif
-};
-
-DECLARE_PER_CPU(struct rcu_data, rcu_data);
-DECLARE_PER_CPU(struct rcu_data, rcu_bh_data);
-
-/*
- * Increment the quiescent state counter.
- * The counter is a bit degenerated: We do not need to know
- * how many quiescent states passed, just if there was at least
- * one since the start of the grace period. Thus just a flag.
- */
-static inline void rcu_qsctr_inc(int cpu)
-{
- struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
- rdp->passed_quiesc = 1;
-}
-static inline void rcu_bh_qsctr_inc(int cpu)
-{
- struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
- rdp->passed_quiesc = 1;
-}
-
-extern int rcu_pending(int cpu);
-extern int rcu_needs_cpu(int cpu);
-
/**
* rcu_read_lock - mark the beginning of an RCU read-side critical section.
*
@@ -163,22 +89,14 @@ extern int rcu_needs_cpu(int cpu);
*
* It is illegal to block while in an RCU read-side critical section.
*/
-#define rcu_read_lock() \
- do { \
- preempt_disable(); \
- __acquire(RCU); \
- } while(0)
+#define rcu_read_lock() __rcu_read_lock()

/**
* rcu_read_unlock - marks the end of an RCU read-side critical section.
*
* See rcu_read_lock() for more information.
*/
-#define rcu_read_unlock() \
- do { \
- __release(RCU); \
- preempt_enable(); \
- } while(0)
+#define rcu_read_unlock() __rcu_read_unlock()

/*
* So where is rcu_write_lock()? It does not exist, as there is no
@@ -201,23 +119,15 @@ extern int rcu_needs_cpu(int cpu);
* can use just rcu_read_lock().
*
*/
-#define rcu_read_lock_bh() \
- do { \
- local_bh_disable(); \
- __acquire(RCU_BH); \
- } while(0)
-
-/*
+#define rcu_read_lock_bh() __rcu_read_lock_bh()
+
+/**
* rcu_read_unlock_bh - marks the end of a softirq-only RCU critical section
*
* See rcu_read_lock_bh() for more information.
*/
-#define rcu_read_unlock_bh() \
- do { \
- __release(RCU_BH); \
- local_bh_enable(); \
- } while(0)
-
+#define rcu_read_unlock_bh() __rcu_read_unlock_bh()
+
/**
* rcu_dereference - fetch an RCU-protected pointer in an
* RCU read-side critical section. This pointer may later
@@ -268,22 +178,49 @@ extern int rcu_needs_cpu(int cpu);
* In "classic RCU", these two guarantees happen to be one and
* the same, but can differ in realtime RCU implementations.
*/
-#define synchronize_sched() synchronize_rcu()
+#define synchronize_sched() __synchronize_sched()
+
+/**
+ * call_rcu - Queue an RCU callback for invocation after a grace period.
+ * @head: structure to be used for queueing the RCU updates.
+ * @func: actual update function to be invoked after the grace period
+ *
+ * The update function will be invoked some time after a full grace
+ * period elapses, in other words after all currently executing RCU
+ * read-side critical sections have completed. RCU read-side critical
+ * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
+ * and may be nested.
+ */
+extern void FASTCALL(call_rcu(struct rcu_head *head,
+ void (*func)(struct rcu_head *head)));

-extern void rcu_init(void);
-extern void rcu_check_callbacks(int cpu, int user);
-extern void rcu_restart_cpu(int cpu);
-extern long rcu_batches_completed(void);
-extern long rcu_batches_completed_bh(void);

-/* Exported interfaces */
-extern void FASTCALL(call_rcu(struct rcu_head *head,
- void (*func)(struct rcu_head *head)));
+/**
+ * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
+ * @head: structure to be used for queueing the RCU updates.
+ * @func: actual update function to be invoked after the grace period
+ *
+ * The update function will be invoked some time after a full grace
+ * period elapses, in other words after all currently executing RCU
+ * read-side critical sections have completed. call_rcu_bh() assumes
+ * that the read-side critical sections end on completion of a softirq
+ * handler. This means that read-side critical sections in process
+ * context must not be interrupted by softirqs. This interface is to be
+ * used when most of the read-side critical sections are in softirq context.
+ * RCU read-side critical sections are delimited by rcu_read_lock() and
+ * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
+ * and rcu_read_unlock_bh(), if in process context. These may be nested.
+ */
extern void FASTCALL(call_rcu_bh(struct rcu_head *head,
void (*func)(struct rcu_head *head)));
+
+/* Exported common interfaces */
extern void synchronize_rcu(void);
-void synchronize_idle(void);
extern void rcu_barrier(void);
+
+/* Internal to kernel */
+extern void rcu_init(void);
+extern void rcu_check_callbacks(int cpu, int user);

#endif /* __KERNEL__ */
#endif /* __LINUX_RCUPDATE_H */
diff -puN kernel/Makefile~rcu-split-classic kernel/Makefile
--- linux-2.6.18-rc3-rcu/kernel/Makefile~rcu-split-classic 2006-08-06 03:07:10.000000000 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/kernel/Makefile 2006-08-27 00:48:50.000000000 +0530
@@ -6,7 +6,7 @@ obj-y = sched.o fork.o exec_domain.o
exit.o itimer.o time.o softirq.o resource.o \
sysctl.o capability.o ptrace.o timer.o user.o \
signal.o sys.o kmod.o workqueue.o pid.o \
- rcupdate.o extable.o params.o posix-timers.o \
+ rcupdate.o rcuclassic.o extable.o params.o posix-timers.o \
kthread.o wait.o kfifo.o sys_ni.o posix-cpu-timers.o mutex.o \
hrtimer.o rwsem.o

diff -puN /dev/null kernel/rcuclassic.c
--- /dev/null 2006-08-26 20:47:46.475534750 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/kernel/rcuclassic.c 2006-08-27 00:49:58.000000000 +0530
@@ -0,0 +1,558 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion, classic implementation
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright (C) IBM Corporation, 2001
+ *
+ * Authors: Dipankar Sarma <[email protected]>
+ * Manfred Spraul <[email protected]>
+ *
+ * Based on the original work by Paul McKenney <[email protected]>
+ * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
+ *
+ * Papers: http://www.rdrop.com/users/paulmck/RCU
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * Documentation/RCU/ *.txt
+ *
+ */
+#include <linux/types.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/spinlock.h>
+#include <linux/smp.h>
+#include <linux/rcupdate.h>
+#include <linux/interrupt.h>
+#include <linux/sched.h>
+#include <asm/atomic.h>
+#include <linux/bitops.h>
+#include <linux/module.h>
+#include <linux/completion.h>
+#include <linux/moduleparam.h>
+#include <linux/percpu.h>
+#include <linux/notifier.h>
+#include <linux/rcupdate.h>
+#include <linux/cpu.h>
+#include <linux/random.h>
+#include <linux/delay.h>
+#include <linux/byteorder/swabb.h>
+
+
+/* Definition for rcupdate control block. */
+static struct rcu_ctrlblk rcu_ctrlblk = {
+ .cur = -300,
+ .completed = -300,
+ .lock = SPIN_LOCK_UNLOCKED,
+ .cpumask = CPU_MASK_NONE,
+};
+static struct rcu_ctrlblk rcu_bh_ctrlblk = {
+ .cur = -300,
+ .completed = -300,
+ .lock = SPIN_LOCK_UNLOCKED,
+ .cpumask = CPU_MASK_NONE,
+};
+
+DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
+DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
+
+/* Fake initialization required by compiler */
+static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
+static int blimit = 10;
+static int qhimark = 10000;
+static int qlowmark = 100;
+#ifdef CONFIG_SMP
+static int rsinterval = 1000;
+#endif
+
+#ifdef CONFIG_SMP
+static void force_quiescent_state(struct rcu_data *rdp,
+ struct rcu_ctrlblk *rcp)
+{
+ int cpu;
+ cpumask_t cpumask;
+ set_need_resched();
+ if (unlikely(rdp->qlen - rdp->last_rs_qlen > rsinterval)) {
+ rdp->last_rs_qlen = rdp->qlen;
+ /*
+ * Don't send IPI to itself. With irqs disabled,
+ * rdp->cpu is the current cpu.
+ */
+ cpumask = rcp->cpumask;
+ cpu_clear(rdp->cpu, cpumask);
+ for_each_cpu_mask(cpu, cpumask)
+ smp_send_reschedule(cpu);
+ }
+}
+#else
+static inline void force_quiescent_state(struct rcu_data *rdp,
+ struct rcu_ctrlblk *rcp)
+{
+ set_need_resched();
+}
+#endif
+
+/*
+ * call_rcu - Queue an RCU callback for invocation after a grace period.
+ * @head: structure to be used for queueing the RCU updates.
+ * @func: actual update function to be invoked after the grace period
+ *
+ * The update function will be invoked some time after a full grace
+ * period elapses, in other words after all currently executing RCU
+ * read-side critical sections have completed. RCU read-side critical
+ * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
+ * and may be nested.
+ */
+void fastcall call_rcu(struct rcu_head *head,
+ void (*func)(struct rcu_head *rcu))
+{
+ unsigned long flags;
+ struct rcu_data *rdp;
+
+ head->func = func;
+ head->next = NULL;
+ local_irq_save(flags);
+ rdp = &__get_cpu_var(rcu_data);
+ *rdp->nxttail = head;
+ rdp->nxttail = &head->next;
+ if (unlikely(++rdp->qlen > qhimark)) {
+ rdp->blimit = INT_MAX;
+ force_quiescent_state(rdp, &rcu_ctrlblk);
+ }
+ local_irq_restore(flags);
+}
+
+/*
+ * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
+ * @head: structure to be used for queueing the RCU updates.
+ * @func: actual update function to be invoked after the grace period
+ *
+ * The update function will be invoked some time after a full grace
+ * period elapses, in other words after all currently executing RCU
+ * read-side critical sections have completed. call_rcu_bh() assumes
+ * that the read-side critical sections end on completion of a softirq
+ * handler. This means that read-side critical sections in process
+ * context must not be interrupted by softirqs. This interface is to be
+ * used when most of the read-side critical sections are in softirq context.
+ * RCU read-side critical sections are delimited by rcu_read_lock() and
+ * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
+ * and rcu_read_unlock_bh(), if in process context. These may be nested.
+ */
+void fastcall call_rcu_bh(struct rcu_head *head,
+ void (*func)(struct rcu_head *rcu))
+{
+ unsigned long flags;
+ struct rcu_data *rdp;
+
+ head->func = func;
+ head->next = NULL;
+ local_irq_save(flags);
+ rdp = &__get_cpu_var(rcu_bh_data);
+ *rdp->nxttail = head;
+ rdp->nxttail = &head->next;
+
+ if (unlikely(++rdp->qlen > qhimark)) {
+ rdp->blimit = INT_MAX;
+ force_quiescent_state(rdp, &rcu_bh_ctrlblk);
+ }
+
+ local_irq_restore(flags);
+}
+
+/*
+ * Return the number of RCU batches processed thus far. Useful
+ * for debug and statistics.
+ */
+long rcu_batches_completed(void)
+{
+ return rcu_ctrlblk.completed;
+}
+
+/*
+ * Return the number of RCU batches processed thus far. Useful
+ * for debug and statistics.
+ */
+long rcu_batches_completed_bh(void)
+{
+ return rcu_bh_ctrlblk.completed;
+}
+
+/*
+ * Invoke the completed RCU callbacks. They are expected to be in
+ * a per-cpu list.
+ */
+static void rcu_do_batch(struct rcu_data *rdp)
+{
+ struct rcu_head *next, *list;
+ int count = 0;
+
+ list = rdp->donelist;
+ while (list) {
+ next = rdp->donelist = list->next;
+ list->func(list);
+ list = next;
+ rdp->qlen--;
+ if (++count >= rdp->blimit)
+ break;
+ }
+ if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
+ rdp->blimit = blimit;
+ if (!rdp->donelist)
+ rdp->donetail = &rdp->donelist;
+ else
+ tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
+}
+
+/*
+ * Grace period handling:
+ * The grace period handling consists out of two steps:
+ * - A new grace period is started.
+ * This is done by rcu_start_batch. The start is not broadcasted to
+ * all cpus, they must pick this up by comparing rcp->cur with
+ * rdp->quiescbatch. All cpus are recorded in the
+ * rcu_ctrlblk.cpumask bitmap.
+ * - All cpus must go through a quiescent state.
+ * Since the start of the grace period is not broadcasted, at least two
+ * calls to rcu_check_quiescent_state are required:
+ * The first call just notices that a new grace period is running. The
+ * following calls check if there was a quiescent state since the beginning
+ * of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
+ * the bitmap is empty, then the grace period is completed.
+ * rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
+ * period (if necessary).
+ */
+/*
+ * Register a new batch of callbacks, and start it up if there is currently no
+ * active batch and the batch to be registered has not already occurred.
+ * Caller must hold rcu_ctrlblk.lock.
+ */
+static void rcu_start_batch(struct rcu_ctrlblk *rcp)
+{
+ if (rcp->next_pending &&
+ rcp->completed == rcp->cur) {
+ rcp->next_pending = 0;
+ /*
+ * next_pending == 0 must be visible in
+ * __rcu_process_callbacks() before it can see new value of cur.
+ */
+ smp_wmb();
+ rcp->cur++;
+
+ /*
+ * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
+ * Barrier Otherwise it can cause tickless idle CPUs to be
+ * included in rcp->cpumask, which will extend graceperiods
+ * unnecessarily.
+ */
+ smp_mb();
+ cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
+
+ }
+}
+
+/*
+ * cpu went through a quiescent state since the beginning of the grace period.
+ * Clear it from the cpu mask and complete the grace period if it was the last
+ * cpu. Start another grace period if someone has further entries pending
+ */
+static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
+{
+ cpu_clear(cpu, rcp->cpumask);
+ if (cpus_empty(rcp->cpumask)) {
+ /* batch completed ! */
+ rcp->completed = rcp->cur;
+ rcu_start_batch(rcp);
+ }
+}
+
+/*
+ * Check if the cpu has gone through a quiescent state (say context
+ * switch). If so and if it already hasn't done so in this RCU
+ * quiescent cycle, then indicate that it has done so.
+ */
+static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
+ struct rcu_data *rdp)
+{
+ if (rdp->quiescbatch != rcp->cur) {
+ /* start new grace period: */
+ rdp->qs_pending = 1;
+ rdp->passed_quiesc = 0;
+ rdp->quiescbatch = rcp->cur;
+ return;
+ }
+
+ /* Grace period already completed for this cpu?
+ * qs_pending is checked instead of the actual bitmap to avoid
+ * cacheline trashing.
+ */
+ if (!rdp->qs_pending)
+ return;
+
+ /*
+ * Was there a quiescent state since the beginning of the grace
+ * period? If no, then exit and wait for the next call.
+ */
+ if (!rdp->passed_quiesc)
+ return;
+ rdp->qs_pending = 0;
+
+ spin_lock(&rcp->lock);
+ /*
+ * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
+ * during cpu startup. Ignore the quiescent state.
+ */
+ if (likely(rdp->quiescbatch == rcp->cur))
+ cpu_quiet(rdp->cpu, rcp);
+
+ spin_unlock(&rcp->lock);
+}
+
+
+#ifdef CONFIG_HOTPLUG_CPU
+
+/* warning! helper for rcu_offline_cpu. do not use elsewhere without reviewing
+ * locking requirements, the list it's pulling from has to belong to a cpu
+ * which is dead and hence not processing interrupts.
+ */
+static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
+ struct rcu_head **tail)
+{
+ local_irq_disable();
+ *this_rdp->nxttail = list;
+ if (list)
+ this_rdp->nxttail = tail;
+ local_irq_enable();
+}
+
+static void __rcu_offline_cpu(struct rcu_data *this_rdp,
+ struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
+{
+ /* if the cpu going offline owns the grace period
+ * we can block indefinitely waiting for it, so flush
+ * it here
+ */
+ spin_lock_bh(&rcp->lock);
+ if (rcp->cur != rcp->completed)
+ cpu_quiet(rdp->cpu, rcp);
+ spin_unlock_bh(&rcp->lock);
+ rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
+ rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
+ rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
+}
+
+static void rcu_offline_cpu(int cpu)
+{
+ struct rcu_data *this_rdp = &get_cpu_var(rcu_data);
+ struct rcu_data *this_bh_rdp = &get_cpu_var(rcu_bh_data);
+
+ __rcu_offline_cpu(this_rdp, &rcu_ctrlblk,
+ &per_cpu(rcu_data, cpu));
+ __rcu_offline_cpu(this_bh_rdp, &rcu_bh_ctrlblk,
+ &per_cpu(rcu_bh_data, cpu));
+ put_cpu_var(rcu_data);
+ put_cpu_var(rcu_bh_data);
+ tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
+}
+
+#else
+
+static void rcu_offline_cpu(int cpu)
+{
+}
+
+#endif
+
+/*
+ * This does the RCU processing work from tasklet context.
+ */
+static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
+ struct rcu_data *rdp)
+{
+ if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
+ *rdp->donetail = rdp->curlist;
+ rdp->donetail = rdp->curtail;
+ rdp->curlist = NULL;
+ rdp->curtail = &rdp->curlist;
+ }
+
+ if (rdp->nxtlist && !rdp->curlist) {
+ local_irq_disable();
+ rdp->curlist = rdp->nxtlist;
+ rdp->curtail = rdp->nxttail;
+ rdp->nxtlist = NULL;
+ rdp->nxttail = &rdp->nxtlist;
+ local_irq_enable();
+
+ /*
+ * start the next batch of callbacks
+ */
+
+ /* determine batch number */
+ rdp->batch = rcp->cur + 1;
+ /* see the comment and corresponding wmb() in
+ * the rcu_start_batch()
+ */
+ smp_rmb();
+
+ if (!rcp->next_pending) {
+ /* and start it/schedule start if it's a new batch */
+ spin_lock(&rcp->lock);
+ rcp->next_pending = 1;
+ rcu_start_batch(rcp);
+ spin_unlock(&rcp->lock);
+ }
+ }
+
+ rcu_check_quiescent_state(rcp, rdp);
+ if (rdp->donelist)
+ rcu_do_batch(rdp);
+}
+
+static void rcu_process_callbacks(unsigned long unused)
+{
+ __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
+ __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
+}
+
+static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
+{
+ /* This cpu has pending rcu entries and the grace period
+ * for them has completed.
+ */
+ if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
+ return 1;
+
+ /* This cpu has no pending entries, but there are new entries */
+ if (!rdp->curlist && rdp->nxtlist)
+ return 1;
+
+ /* This cpu has finished callbacks to invoke */
+ if (rdp->donelist)
+ return 1;
+
+ /* The rcu core waits for a quiescent state from the cpu */
+ if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
+ return 1;
+
+ /* nothing to do */
+ return 0;
+}
+
+/*
+ * Check to see if there is any immediate RCU-related work to be done
+ * by the current CPU, returning 1 if so. This function is part of the
+ * RCU implementation; it is -not- an exported member of the RCU API.
+ */
+int rcu_pending(int cpu)
+{
+ return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
+ __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
+}
+
+/*
+ * Check to see if any future RCU-related work will need to be done
+ * by the current CPU, even if none need be done immediately, returning
+ * 1 if so. This function is part of the RCU implementation; it is -not-
+ * an exported member of the RCU API.
+ */
+int rcu_needs_cpu(int cpu)
+{
+ struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
+ struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu);
+
+ return (!!rdp->curlist || !!rdp_bh->curlist || rcu_pending(cpu));
+}
+
+void rcu_check_callbacks(int cpu, int user)
+{
+ if (user ||
+ (idle_cpu(cpu) && !in_softirq() &&
+ hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
+ rcu_qsctr_inc(cpu);
+ rcu_bh_qsctr_inc(cpu);
+ } else if (!in_softirq())
+ rcu_bh_qsctr_inc(cpu);
+ tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
+}
+
+static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
+ struct rcu_data *rdp)
+{
+ memset(rdp, 0, sizeof(*rdp));
+ rdp->curtail = &rdp->curlist;
+ rdp->nxttail = &rdp->nxtlist;
+ rdp->donetail = &rdp->donelist;
+ rdp->quiescbatch = rcp->completed;
+ rdp->qs_pending = 0;
+ rdp->cpu = cpu;
+ rdp->blimit = blimit;
+}
+
+static void __devinit rcu_online_cpu(int cpu)
+{
+ struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
+ struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
+
+ rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
+ rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
+ tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
+}
+
+static int __devinit rcu_cpu_notify(struct notifier_block *self,
+ unsigned long action, void *hcpu)
+{
+ long cpu = (long)hcpu;
+ switch (action) {
+ case CPU_UP_PREPARE:
+ rcu_online_cpu(cpu);
+ break;
+ case CPU_DEAD:
+ rcu_offline_cpu(cpu);
+ break;
+ default:
+ break;
+ }
+ return NOTIFY_OK;
+}
+
+static struct notifier_block __devinitdata rcu_nb = {
+ .notifier_call = rcu_cpu_notify,
+};
+
+/*
+ * Initializes rcu mechanism. Assumed to be called early.
+ * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
+ * Note that rcu_qsctr and friends are implicitly
+ * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
+ */
+void __init __rcu_init(void)
+{
+ rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
+ (void *)(long)smp_processor_id());
+ /* Register notifier for non-boot CPUs */
+ register_cpu_notifier(&rcu_nb);
+}
+
+module_param(blimit, int, 0);
+module_param(qhimark, int, 0);
+module_param(qlowmark, int, 0);
+#ifdef CONFIG_SMP
+module_param(rsinterval, int, 0);
+#endif
+
+EXPORT_SYMBOL_GPL(rcu_batches_completed);
+EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
+EXPORT_SYMBOL_GPL(call_rcu);
+EXPORT_SYMBOL_GPL(call_rcu_bh);
diff -puN kernel/rcupdate.c~rcu-split-classic kernel/rcupdate.c
--- linux-2.6.18-rc3-rcu/kernel/rcupdate.c~rcu-split-classic 2006-08-06 03:07:10.000000000 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/kernel/rcupdate.c 2006-08-06 09:58:28.000000000 +0530
@@ -40,155 +40,53 @@
#include <linux/sched.h>
#include <asm/atomic.h>
#include <linux/bitops.h>
-#include <linux/module.h>
#include <linux/completion.h>
-#include <linux/moduleparam.h>
#include <linux/percpu.h>
-#include <linux/notifier.h>
-#include <linux/rcupdate.h>
#include <linux/cpu.h>
#include <linux/mutex.h>
+#include <linux/module.h>

-/* Definition for rcupdate control block. */
-static struct rcu_ctrlblk rcu_ctrlblk = {
- .cur = -300,
- .completed = -300,
- .lock = __SPIN_LOCK_UNLOCKED(&rcu_ctrlblk.lock),
- .cpumask = CPU_MASK_NONE,
-};
-static struct rcu_ctrlblk rcu_bh_ctrlblk = {
- .cur = -300,
- .completed = -300,
- .lock = __SPIN_LOCK_UNLOCKED(&rcu_bh_ctrlblk.lock),
- .cpumask = CPU_MASK_NONE,
+struct rcu_synchronize {
+ struct rcu_head head;
+ struct completion completion;
};

-DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
-DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
-
-/* Fake initialization required by compiler */
-static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
-static int blimit = 10;
-static int qhimark = 10000;
-static int qlowmark = 100;
-#ifdef CONFIG_SMP
-static int rsinterval = 1000;
-#endif
-
+static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head);
static atomic_t rcu_barrier_cpu_count;
static DEFINE_MUTEX(rcu_barrier_mutex);
static struct completion rcu_barrier_completion;

-#ifdef CONFIG_SMP
-static void force_quiescent_state(struct rcu_data *rdp,
- struct rcu_ctrlblk *rcp)
-{
- int cpu;
- cpumask_t cpumask;
- set_need_resched();
- if (unlikely(rdp->qlen - rdp->last_rs_qlen > rsinterval)) {
- rdp->last_rs_qlen = rdp->qlen;
- /*
- * Don't send IPI to itself. With irqs disabled,
- * rdp->cpu is the current cpu.
- */
- cpumask = rcp->cpumask;
- cpu_clear(rdp->cpu, cpumask);
- for_each_cpu_mask(cpu, cpumask)
- smp_send_reschedule(cpu);
- }
-}
-#else
-static inline void force_quiescent_state(struct rcu_data *rdp,
- struct rcu_ctrlblk *rcp)
+/* Because of FASTCALL declaration of complete, we use this wrapper */
+static void wakeme_after_rcu(struct rcu_head *head)
{
- set_need_resched();
+ struct rcu_synchronize *rcu;
+
+ rcu = container_of(head, struct rcu_synchronize, head);
+ complete(&rcu->completion);
}
-#endif

/**
- * call_rcu - Queue an RCU callback for invocation after a grace period.
- * @head: structure to be used for queueing the RCU updates.
- * @func: actual update function to be invoked after the grace period
+ * synchronize_rcu - wait until a grace period has elapsed.
*
- * The update function will be invoked some time after a full grace
- * period elapses, in other words after all currently executing RCU
+ * Control will return to the caller some time after a full grace
+ * period has elapsed, in other words after all currently executing RCU
* read-side critical sections have completed. RCU read-side critical
* sections are delimited by rcu_read_lock() and rcu_read_unlock(),
* and may be nested.
- */
-void fastcall call_rcu(struct rcu_head *head,
- void (*func)(struct rcu_head *rcu))
-{
- unsigned long flags;
- struct rcu_data *rdp;
-
- head->func = func;
- head->next = NULL;
- local_irq_save(flags);
- rdp = &__get_cpu_var(rcu_data);
- *rdp->nxttail = head;
- rdp->nxttail = &head->next;
- if (unlikely(++rdp->qlen > qhimark)) {
- rdp->blimit = INT_MAX;
- force_quiescent_state(rdp, &rcu_ctrlblk);
- }
- local_irq_restore(flags);
-}
-
-/**
- * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
- * @head: structure to be used for queueing the RCU updates.
- * @func: actual update function to be invoked after the grace period
*
- * The update function will be invoked some time after a full grace
- * period elapses, in other words after all currently executing RCU
- * read-side critical sections have completed. call_rcu_bh() assumes
- * that the read-side critical sections end on completion of a softirq
- * handler. This means that read-side critical sections in process
- * context must not be interrupted by softirqs. This interface is to be
- * used when most of the read-side critical sections are in softirq context.
- * RCU read-side critical sections are delimited by rcu_read_lock() and
- * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
- * and rcu_read_unlock_bh(), if in process context. These may be nested.
- */
-void fastcall call_rcu_bh(struct rcu_head *head,
- void (*func)(struct rcu_head *rcu))
-{
- unsigned long flags;
- struct rcu_data *rdp;
-
- head->func = func;
- head->next = NULL;
- local_irq_save(flags);
- rdp = &__get_cpu_var(rcu_bh_data);
- *rdp->nxttail = head;
- rdp->nxttail = &head->next;
-
- if (unlikely(++rdp->qlen > qhimark)) {
- rdp->blimit = INT_MAX;
- force_quiescent_state(rdp, &rcu_bh_ctrlblk);
- }
-
- local_irq_restore(flags);
-}
-
-/*
- * Return the number of RCU batches processed thus far. Useful
- * for debug and statistics.
- */
-long rcu_batches_completed(void)
-{
- return rcu_ctrlblk.completed;
-}
-
-/*
- * Return the number of RCU batches processed thus far. Useful
- * for debug and statistics.
+ * If your read-side code is not protected by rcu_read_lock(), do -not-
+ * use synchronize_rcu().
*/
-long rcu_batches_completed_bh(void)
+void synchronize_rcu(void)
{
- return rcu_bh_ctrlblk.completed;
+ struct rcu_synchronize rcu;
+
+ init_completion(&rcu.completion);
+ /* Will wake me after RCU finished */
+ call_rcu(&rcu.head, wakeme_after_rcu);
+
+ /* Wait for it */
+ wait_for_completion(&rcu.completion);
}

static void rcu_barrier_callback(struct rcu_head *notused)
@@ -203,10 +101,8 @@ static void rcu_barrier_callback(struct
static void rcu_barrier_func(void *notused)
{
int cpu = smp_processor_id();
- struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
- struct rcu_head *head;
+ struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);

- head = &rdp->barrier;
atomic_inc(&rcu_barrier_cpu_count);
call_rcu(head, rcu_barrier_callback);
}
@@ -225,410 +121,11 @@ void rcu_barrier(void)
wait_for_completion(&rcu_barrier_completion);
mutex_unlock(&rcu_barrier_mutex);
}
-EXPORT_SYMBOL_GPL(rcu_barrier);
-
-/*
- * Invoke the completed RCU callbacks. They are expected to be in
- * a per-cpu list.
- */
-static void rcu_do_batch(struct rcu_data *rdp)
-{
- struct rcu_head *next, *list;
- int count = 0;
-
- list = rdp->donelist;
- while (list) {
- next = rdp->donelist = list->next;
- list->func(list);
- list = next;
- rdp->qlen--;
- if (++count >= rdp->blimit)
- break;
- }
- if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
- rdp->blimit = blimit;
- if (!rdp->donelist)
- rdp->donetail = &rdp->donelist;
- else
- tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
-}
-
-/*
- * Grace period handling:
- * The grace period handling consists out of two steps:
- * - A new grace period is started.
- * This is done by rcu_start_batch. The start is not broadcasted to
- * all cpus, they must pick this up by comparing rcp->cur with
- * rdp->quiescbatch. All cpus are recorded in the
- * rcu_ctrlblk.cpumask bitmap.
- * - All cpus must go through a quiescent state.
- * Since the start of the grace period is not broadcasted, at least two
- * calls to rcu_check_quiescent_state are required:
- * The first call just notices that a new grace period is running. The
- * following calls check if there was a quiescent state since the beginning
- * of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
- * the bitmap is empty, then the grace period is completed.
- * rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
- * period (if necessary).
- */
-/*
- * Register a new batch of callbacks, and start it up if there is currently no
- * active batch and the batch to be registered has not already occurred.
- * Caller must hold rcu_ctrlblk.lock.
- */
-static void rcu_start_batch(struct rcu_ctrlblk *rcp)
-{
- if (rcp->next_pending &&
- rcp->completed == rcp->cur) {
- rcp->next_pending = 0;
- /*
- * next_pending == 0 must be visible in
- * __rcu_process_callbacks() before it can see new value of cur.
- */
- smp_wmb();
- rcp->cur++;
-
- /*
- * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
- * Barrier Otherwise it can cause tickless idle CPUs to be
- * included in rcp->cpumask, which will extend graceperiods
- * unnecessarily.
- */
- smp_mb();
- cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
-
- }
-}
-
-/*
- * cpu went through a quiescent state since the beginning of the grace period.
- * Clear it from the cpu mask and complete the grace period if it was the last
- * cpu. Start another grace period if someone has further entries pending
- */
-static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
-{
- cpu_clear(cpu, rcp->cpumask);
- if (cpus_empty(rcp->cpumask)) {
- /* batch completed ! */
- rcp->completed = rcp->cur;
- rcu_start_batch(rcp);
- }
-}
-
-/*
- * Check if the cpu has gone through a quiescent state (say context
- * switch). If so and if it already hasn't done so in this RCU
- * quiescent cycle, then indicate that it has done so.
- */
-static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
- struct rcu_data *rdp)
-{
- if (rdp->quiescbatch != rcp->cur) {
- /* start new grace period: */
- rdp->qs_pending = 1;
- rdp->passed_quiesc = 0;
- rdp->quiescbatch = rcp->cur;
- return;
- }
-
- /* Grace period already completed for this cpu?
- * qs_pending is checked instead of the actual bitmap to avoid
- * cacheline trashing.
- */
- if (!rdp->qs_pending)
- return;
-
- /*
- * Was there a quiescent state since the beginning of the grace
- * period? If no, then exit and wait for the next call.
- */
- if (!rdp->passed_quiesc)
- return;
- rdp->qs_pending = 0;
-
- spin_lock(&rcp->lock);
- /*
- * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
- * during cpu startup. Ignore the quiescent state.
- */
- if (likely(rdp->quiescbatch == rcp->cur))
- cpu_quiet(rdp->cpu, rcp);
-
- spin_unlock(&rcp->lock);
-}
-
-
-#ifdef CONFIG_HOTPLUG_CPU
-
-/* warning! helper for rcu_offline_cpu. do not use elsewhere without reviewing
- * locking requirements, the list it's pulling from has to belong to a cpu
- * which is dead and hence not processing interrupts.
- */
-static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
- struct rcu_head **tail)
-{
- local_irq_disable();
- *this_rdp->nxttail = list;
- if (list)
- this_rdp->nxttail = tail;
- local_irq_enable();
-}
-
-static void __rcu_offline_cpu(struct rcu_data *this_rdp,
- struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
-{
- /* if the cpu going offline owns the grace period
- * we can block indefinitely waiting for it, so flush
- * it here
- */
- spin_lock_bh(&rcp->lock);
- if (rcp->cur != rcp->completed)
- cpu_quiet(rdp->cpu, rcp);
- spin_unlock_bh(&rcp->lock);
- rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
- rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
- rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
-}
-
-static void rcu_offline_cpu(int cpu)
-{
- struct rcu_data *this_rdp = &get_cpu_var(rcu_data);
- struct rcu_data *this_bh_rdp = &get_cpu_var(rcu_bh_data);
-
- __rcu_offline_cpu(this_rdp, &rcu_ctrlblk,
- &per_cpu(rcu_data, cpu));
- __rcu_offline_cpu(this_bh_rdp, &rcu_bh_ctrlblk,
- &per_cpu(rcu_bh_data, cpu));
- put_cpu_var(rcu_data);
- put_cpu_var(rcu_bh_data);
- tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
-}

-#else
-
-static void rcu_offline_cpu(int cpu)
-{
-}
-
-#endif
-
-/*
- * This does the RCU processing work from tasklet context.
- */
-static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
- struct rcu_data *rdp)
-{
- if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
- *rdp->donetail = rdp->curlist;
- rdp->donetail = rdp->curtail;
- rdp->curlist = NULL;
- rdp->curtail = &rdp->curlist;
- }
-
- if (rdp->nxtlist && !rdp->curlist) {
- local_irq_disable();
- rdp->curlist = rdp->nxtlist;
- rdp->curtail = rdp->nxttail;
- rdp->nxtlist = NULL;
- rdp->nxttail = &rdp->nxtlist;
- local_irq_enable();
-
- /*
- * start the next batch of callbacks
- */
-
- /* determine batch number */
- rdp->batch = rcp->cur + 1;
- /* see the comment and corresponding wmb() in
- * the rcu_start_batch()
- */
- smp_rmb();
-
- if (!rcp->next_pending) {
- /* and start it/schedule start if it's a new batch */
- spin_lock(&rcp->lock);
- rcp->next_pending = 1;
- rcu_start_batch(rcp);
- spin_unlock(&rcp->lock);
- }
- }
-
- rcu_check_quiescent_state(rcp, rdp);
- if (rdp->donelist)
- rcu_do_batch(rdp);
-}
-
-static void rcu_process_callbacks(unsigned long unused)
-{
- __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
- __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
-}
-
-static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
-{
- /* This cpu has pending rcu entries and the grace period
- * for them has completed.
- */
- if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
- return 1;
-
- /* This cpu has no pending entries, but there are new entries */
- if (!rdp->curlist && rdp->nxtlist)
- return 1;
-
- /* This cpu has finished callbacks to invoke */
- if (rdp->donelist)
- return 1;
-
- /* The rcu core waits for a quiescent state from the cpu */
- if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
- return 1;
-
- /* nothing to do */
- return 0;
-}
-
-/*
- * Check to see if there is any immediate RCU-related work to be done
- * by the current CPU, returning 1 if so. This function is part of the
- * RCU implementation; it is -not- an exported member of the RCU API.
- */
-int rcu_pending(int cpu)
-{
- return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
- __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
-}
-
-/*
- * Check to see if any future RCU-related work will need to be done
- * by the current CPU, even if none need be done immediately, returning
- * 1 if so. This function is part of the RCU implementation; it is -not-
- * an exported member of the RCU API.
- */
-int rcu_needs_cpu(int cpu)
-{
- struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
- struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu);
-
- return (!!rdp->curlist || !!rdp_bh->curlist || rcu_pending(cpu));
-}
-
-void rcu_check_callbacks(int cpu, int user)
-{
- if (user ||
- (idle_cpu(cpu) && !in_softirq() &&
- hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
- rcu_qsctr_inc(cpu);
- rcu_bh_qsctr_inc(cpu);
- } else if (!in_softirq())
- rcu_bh_qsctr_inc(cpu);
- tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
-}
-
-static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
- struct rcu_data *rdp)
-{
- memset(rdp, 0, sizeof(*rdp));
- rdp->curtail = &rdp->curlist;
- rdp->nxttail = &rdp->nxtlist;
- rdp->donetail = &rdp->donelist;
- rdp->quiescbatch = rcp->completed;
- rdp->qs_pending = 0;
- rdp->cpu = cpu;
- rdp->blimit = blimit;
-}
-
-static void __devinit rcu_online_cpu(int cpu)
-{
- struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
- struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
-
- rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
- rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
- tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
-}
-
-static int __devinit rcu_cpu_notify(struct notifier_block *self,
- unsigned long action, void *hcpu)
-{
- long cpu = (long)hcpu;
- switch (action) {
- case CPU_UP_PREPARE:
- rcu_online_cpu(cpu);
- break;
- case CPU_DEAD:
- rcu_offline_cpu(cpu);
- break;
- default:
- break;
- }
- return NOTIFY_OK;
-}
-
-static struct notifier_block __devinitdata rcu_nb = {
- .notifier_call = rcu_cpu_notify,
-};
-
-/*
- * Initializes rcu mechanism. Assumed to be called early.
- * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
- * Note that rcu_qsctr and friends are implicitly
- * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
- */
void __init rcu_init(void)
{
- rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
- (void *)(long)smp_processor_id());
- /* Register notifier for non-boot CPUs */
- register_cpu_notifier(&rcu_nb);
-}
-
-struct rcu_synchronize {
- struct rcu_head head;
- struct completion completion;
-};
-
-/* Because of FASTCALL declaration of complete, we use this wrapper */
-static void wakeme_after_rcu(struct rcu_head *head)
-{
- struct rcu_synchronize *rcu;
-
- rcu = container_of(head, struct rcu_synchronize, head);
- complete(&rcu->completion);
-}
-
-/**
- * synchronize_rcu - wait until a grace period has elapsed.
- *
- * Control will return to the caller some time after a full grace
- * period has elapsed, in other words after all currently executing RCU
- * read-side critical sections have completed. RCU read-side critical
- * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
- * and may be nested.
- *
- * If your read-side code is not protected by rcu_read_lock(), do -not-
- * use synchronize_rcu().
- */
-void synchronize_rcu(void)
-{
- struct rcu_synchronize rcu;
-
- init_completion(&rcu.completion);
- /* Will wake me after RCU finished */
- call_rcu(&rcu.head, wakeme_after_rcu);
-
- /* Wait for it */
- wait_for_completion(&rcu.completion);
+ __rcu_init();
}

-module_param(blimit, int, 0);
-module_param(qhimark, int, 0);
-module_param(qlowmark, int, 0);
-#ifdef CONFIG_SMP
-module_param(rsinterval, int, 0);
-#endif
-EXPORT_SYMBOL_GPL(rcu_batches_completed);
-EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
-EXPORT_SYMBOL_GPL(call_rcu);
-EXPORT_SYMBOL_GPL(call_rcu_bh);
+EXPORT_SYMBOL_GPL(rcu_barrier);
EXPORT_SYMBOL_GPL(synchronize_rcu);

_

2006-08-28 16:11:07

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [PATCH 2/4] RCU: use a separate softirq


Finally, RCU gets its own softirq. With it being used extensively,
the per-cpu tasklet used earlier was just a softirq with overheads.
This makes things more efficient.

Signed-off-by: Dipankar Sarma <[email protected]>
---


include/linux/interrupt.h | 3 ++-
kernel/rcuclassic.c | 12 +++++-------
2 files changed, 7 insertions(+), 8 deletions(-)

diff -puN kernel/rcuclassic.c~rcu-softirq kernel/rcuclassic.c
--- linux-2.6.18-rc3-rcu/kernel/rcuclassic.c~rcu-softirq 2006-08-27 01:01:15.000000000 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/kernel/rcuclassic.c 2006-08-27 01:01:15.000000000 +0530
@@ -69,7 +69,6 @@ DEFINE_PER_CPU(struct rcu_data, rcu_data
DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };

/* Fake initialization required by compiler */
-static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
static int blimit = 10;
static int qhimark = 10000;
static int qlowmark = 100;
@@ -212,7 +211,7 @@ static void rcu_do_batch(struct rcu_data
if (!rdp->donelist)
rdp->donetail = &rdp->donelist;
else
- tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
+ raise_softirq(RCU_SOFTIRQ);
}

/*
@@ -363,7 +362,6 @@ static void rcu_offline_cpu(int cpu)
&per_cpu(rcu_bh_data, cpu));
put_cpu_var(rcu_data);
put_cpu_var(rcu_bh_data);
- tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
}

#else
@@ -375,7 +373,7 @@ static void rcu_offline_cpu(int cpu)
#endif

/*
- * This does the RCU processing work from tasklet context.
+ * This does the RCU processing work from softirq context.
*/
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
@@ -420,7 +418,7 @@ static void __rcu_process_callbacks(stru
rcu_do_batch(rdp);
}

-static void rcu_process_callbacks(unsigned long unused)
+static void rcu_process_callbacks(struct softirq_action *unused)
{
__rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
__rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
@@ -484,7 +482,7 @@ void rcu_check_callbacks(int cpu, int us
rcu_bh_qsctr_inc(cpu);
} else if (!in_softirq())
rcu_bh_qsctr_inc(cpu);
- tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
+ raise_softirq(RCU_SOFTIRQ);
}

static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
@@ -507,7 +505,7 @@ static void __devinit rcu_online_cpu(int

rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
- tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
+ open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
}

static int __devinit rcu_cpu_notify(struct notifier_block *self,
diff -puN include/linux/interrupt.h~rcu-softirq include/linux/interrupt.h
--- linux-2.6.18-rc3-rcu/include/linux/interrupt.h~rcu-softirq 2006-08-27 01:01:15.000000000 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/include/linux/interrupt.h 2006-08-27 01:01:15.000000000 +0530
@@ -219,7 +219,8 @@ enum
NET_TX_SOFTIRQ,
NET_RX_SOFTIRQ,
BLOCK_SOFTIRQ,
- TASKLET_SOFTIRQ
+ TASKLET_SOFTIRQ,
+ RCU_SOFTIRQ /* Preferable RCU should always be the last softirq */
};

/* softirq mask and active fields moved to irq_cpustat_t in

_

2006-08-28 16:12:20

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [PATCH 3/4] RCU: preemptible RCU implementation

From: Paul McKenney <[email protected]>

This patch implements a new version of RCU which allows its read-side
critical sections to be preempted. It uses a set of counter pairs
to keep track of the read-side critical sections and flips them
when all tasks exit read-side critical section. The details
of this implementation can be found in this paper -

http://www.rdrop.com/users/paulmck/RCU/OLSrtRCU.2006.08.11a.pdf

This patch was developed as a part of the -rt kernel
development and meant to provide better latencies when
read-side critical sections of RCU don't disable preemption.
As a consequence of keeping track of RCU readers, the readers
have a slight overhead (optimizations in the paper).
This implementation co-exists with the "classic" RCU
implementations and can be switched to at compiler.

Signed-off-by: Paul McKenney <[email protected]>
Signed-off-by: Dipankar Sarma <[email protected]>


include/linux/rcupdate.h | 5
include/linux/rcupreempt.h | 66 ++++++
include/linux/sched.h | 6
kernel/Kconfig.preempt | 37 +++
kernel/Makefile | 4
kernel/rcupreempt.c | 464 +++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 581 insertions(+), 1 deletion(-)

diff -puN /dev/null include/linux/rcupreempt.h
--- /dev/null 2006-08-28 19:57:17.885180500 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/include/linux/rcupreempt.h 2006-08-27 11:42:15.000000000 +0530
@@ -0,0 +1,66 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion (RT implementation)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright (C) IBM Corporation, 2006
+ *
+ * Author: Paul McKenney <[email protected]>
+ *
+ * Based on the original work by Paul McKenney <[email protected]>
+ * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
+ * Papers:
+ * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
+ * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * http://lse.sourceforge.net/locking/rcupdate.html
+ *
+ */
+
+#ifndef __LINUX_RCUPREEMPT_H
+#define __LINUX_RCUPREEMPT_H
+
+#ifdef __KERNEL__
+
+#include <linux/cache.h>
+#include <linux/spinlock.h>
+#include <linux/threads.h>
+#include <linux/percpu.h>
+#include <linux/cpumask.h>
+#include <linux/seqlock.h>
+
+#define rcu_qsctr_inc(cpu)
+#define rcu_bh_qsctr_inc(cpu)
+#define call_rcu_bh(head, rcu) call_rcu(head, rcu)
+
+extern void __rcu_read_lock(void);
+extern void __rcu_read_unlock(void);
+extern int rcu_pending(int cpu);
+
+#define __rcu_read_lock_bh() { rcu_read_lock(); local_bh_disable(); }
+#define __rcu_read_unlock_bh() { local_bh_enable(); rcu_read_unlock(); }
+
+#define __rcu_read_lock_nesting() (current->rcu_read_lock_nesting)
+
+extern void __synchronize_sched(void);
+
+extern void __rcu_init(void);
+extern void rcu_check_callbacks(int cpu, int user);
+extern void rcu_restart_cpu(int cpu);
+extern long rcu_batches_completed(void);
+
+#endif /* __KERNEL__ */
+#endif /* __LINUX_RCUPREEMPT_H */
diff -puN kernel/Makefile~rcu-preempt kernel/Makefile
--- linux-2.6.18-rc3-rcu/kernel/Makefile~rcu-preempt 2006-08-27 11:42:15.000000000 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/kernel/Makefile 2006-08-28 20:26:08.000000000 +0530
@@ -6,7 +6,7 @@ obj-y = sched.o fork.o exec_domain.o
exit.o itimer.o time.o softirq.o resource.o \
sysctl.o capability.o ptrace.o timer.o user.o \
signal.o sys.o kmod.o workqueue.o pid.o \
- rcupdate.o rcuclassic.o extable.o params.o posix-timers.o \
+ extable.o params.o posix-timers.o \
kthread.o wait.o kfifo.o sys_ni.o posix-cpu-timers.o mutex.o \
hrtimer.o rwsem.o

@@ -47,6 +47,8 @@ obj-$(CONFIG_DETECT_SOFTLOCKUP) += softl
obj-$(CONFIG_GENERIC_HARDIRQS) += irq/
obj-$(CONFIG_SECCOMP) += seccomp.o
obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o
+obj-$(CONFIG_CLASSIC_RCU) += rcupdate.o rcuclassic.o
+obj-$(CONFIG_PREEMPT_RCU) += rcupdate.o rcupreempt.o
obj-$(CONFIG_RELAY) += relay.o
obj-$(CONFIG_TASK_DELAY_ACCT) += delayacct.o
obj-$(CONFIG_TASKSTATS) += taskstats.o
diff -puN kernel/Kconfig.preempt~rcu-preempt kernel/Kconfig.preempt
--- linux-2.6.18-rc3-rcu/kernel/Kconfig.preempt~rcu-preempt 2006-08-27 11:42:15.000000000 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/kernel/Kconfig.preempt 2006-08-28 20:26:08.000000000 +0530
@@ -63,3 +63,40 @@ config PREEMPT_BKL
Say Y here if you are building a kernel for a desktop system.
Say N if you are unsure.

+choice
+ prompt "RCU implementation type:"
+ default CLASSIC_RCU
+
+config CLASSIC_RCU
+ bool "Classic RCU"
+ help
+ This option selects the classic RCU implementation that is
+ designed for best read-side performance on non-realtime
+ systems.
+
+ Say Y if you are unsure.
+
+config PREEMPT_RCU
+ bool "Preemptible RCU"
+ help
+ This option reduces the latency of the kernel by making certain
+ RCU sections preemptible. Normally RCU code is non-preemptible, if
+ this option is selected then read-only RCU sections become
+ preemptible. This helps latency, but may expose bugs due to
+ now-naive assumptions about each RCU read-side critical section
+ remaining on a given CPU through its execution.
+
+ Say N if you are unsure.
+
+endchoice
+
+config RCU_STATS
+ bool "/proc stats for preemptible RCU read-side critical sections"
+ depends on PREEMPT_RCU
+ default y
+ help
+ This option provides /proc stats to provide debugging info for
+ the preemptible realtime RCU implementation.
+
+ Say Y here if you want to see RCU stats in /proc
+ Say N if you are unsure.
diff -puN /dev/null kernel/rcupreempt.c
--- /dev/null 2006-08-28 19:57:17.885180500 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/kernel/rcupreempt.c 2006-08-28 20:26:08.000000000 +0530
@@ -0,0 +1,464 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion, realtime implementation
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright (C) IBM Corporation, 2001
+ *
+ * Authors: Paul E. McKenney <[email protected]>
+ * With thanks to Esben Nielsen, Bill Huey, and Ingo Molnar
+ * for pushing me away from locks and towards counters.
+ *
+ * Papers: http://www.rdrop.com/users/paulmck/RCU
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * Documentation/RCU/ *.txt
+ *
+ */
+#include <linux/types.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/spinlock.h>
+#include <linux/smp.h>
+#include <linux/rcupdate.h>
+#include <linux/interrupt.h>
+#include <linux/sched.h>
+#include <asm/atomic.h>
+#include <linux/bitops.h>
+#include <linux/module.h>
+#include <linux/completion.h>
+#include <linux/moduleparam.h>
+#include <linux/percpu.h>
+#include <linux/notifier.h>
+#include <linux/rcupdate.h>
+#include <linux/cpu.h>
+#include <linux/random.h>
+#include <linux/delay.h>
+#include <linux/byteorder/swabb.h>
+#include <linux/cpumask.h>
+
+/*
+ * PREEMPT_RCU data structures.
+ */
+
+struct rcu_data {
+ spinlock_t lock;
+ long completed; /* Number of last completed batch. */
+ struct tasklet_struct rcu_tasklet;
+ struct rcu_head *nextlist;
+ struct rcu_head **nexttail;
+ struct rcu_head *waitlist;
+ struct rcu_head **waittail;
+ struct rcu_head *donelist;
+ struct rcu_head **donetail;
+#ifdef CONFIG_RCU_STATS
+ long n_next_length;
+ long n_next_add;
+ long n_wait_length;
+ long n_wait_add;
+ long n_done_length;
+ long n_done_add;
+ long n_done_remove;
+ atomic_t n_done_invoked;
+ long n_rcu_check_callbacks;
+ atomic_t n_rcu_try_flip1;
+ long n_rcu_try_flip2;
+ long n_rcu_try_flip3;
+ atomic_t n_rcu_try_flip_e1;
+ long n_rcu_try_flip_e2;
+ long n_rcu_try_flip_e3;
+#endif /* #ifdef CONFIG_RCU_STATS */
+};
+struct rcu_ctrlblk {
+ spinlock_t fliplock;
+ long completed; /* Number of last completed batch. */
+};
+static struct rcu_data rcu_data;
+static struct rcu_ctrlblk rcu_ctrlblk = {
+ .fliplock = SPIN_LOCK_UNLOCKED,
+ .completed = 0,
+};
+static DEFINE_PER_CPU(atomic_t [2], rcu_flipctr) =
+ { ATOMIC_INIT(0), ATOMIC_INIT(0) };
+
+/*
+ * Return the number of RCU batches processed thus far. Useful
+ * for debug and statistics.
+ */
+long rcu_batches_completed(void)
+{
+ return rcu_ctrlblk.completed;
+}
+
+void __rcu_read_lock(void)
+{
+ int flipctr;
+ unsigned long oldirq;
+
+ local_irq_save(oldirq);
+
+ if (current->rcu_read_lock_nesting++ == 0) {
+
+ /*
+ * Outermost nesting of rcu_read_lock(), so atomically
+ * increment the current counter for the current CPU.
+ */
+
+ flipctr = rcu_ctrlblk.completed & 0x1;
+ smp_read_barrier_depends();
+ current->rcu_flipctr1 = &(__get_cpu_var(rcu_flipctr)[flipctr]);
+ /* Can optimize to non-atomic on fastpath, but start simple. */
+ atomic_inc(current->rcu_flipctr1);
+ smp_mb__after_atomic_inc(); /* might optimize out... */
+ if (unlikely(flipctr != (rcu_ctrlblk.completed & 0x1))) {
+
+ /*
+ * We raced with grace-period processing (flip).
+ * Although we cannot be preempted here, there
+ * could be interrupts, ECC errors and the like,
+ * so just nail down both sides of the rcu_flipctr
+ * array for the duration of our RCU read-side
+ * critical section, preventing a second flip
+ * from racing with us. At some point, it would
+ * be safe to decrement one of the counters, but
+ * we have no way of knowing when that would be.
+ * So just decrement them both in rcu_read_unlock().
+ */
+
+ current->rcu_flipctr2 =
+ &(__get_cpu_var(rcu_flipctr)[!flipctr]);
+ /* Can again optimize to non-atomic on fastpath. */
+ atomic_inc(current->rcu_flipctr2);
+ smp_mb__after_atomic_inc(); /* might optimize out... */
+ }
+ }
+ local_irq_restore(oldirq);
+}
+
+void __rcu_read_unlock(void)
+{
+ unsigned long oldirq;
+
+ local_irq_save(oldirq);
+ if (--current->rcu_read_lock_nesting == 0) {
+
+ /*
+ * Just atomically decrement whatever we incremented.
+ * Might later want to awaken some task waiting for the
+ * grace period to complete, but keep it simple for the
+ * moment.
+ */
+
+ smp_mb__before_atomic_dec();
+ atomic_dec(current->rcu_flipctr1);
+ current->rcu_flipctr1 = NULL;
+ if (unlikely(current->rcu_flipctr2 != NULL)) {
+ atomic_dec(current->rcu_flipctr2);
+ current->rcu_flipctr2 = NULL;
+ }
+ }
+
+ local_irq_restore(oldirq);
+}
+
+static void __rcu_advance_callbacks(void)
+{
+
+ if (rcu_data.completed != rcu_ctrlblk.completed) {
+ if (rcu_data.waitlist != NULL) {
+ *rcu_data.donetail = rcu_data.waitlist;
+ rcu_data.donetail = rcu_data.waittail;
+#ifdef CONFIG_RCU_STATS
+ rcu_data.n_done_length += rcu_data.n_wait_length;
+ rcu_data.n_done_add += rcu_data.n_wait_length;
+ rcu_data.n_wait_length = 0;
+#endif /* #ifdef CONFIG_RCU_STATS */
+ }
+ if (rcu_data.nextlist != NULL) {
+ rcu_data.waitlist = rcu_data.nextlist;
+ rcu_data.waittail = rcu_data.nexttail;
+ rcu_data.nextlist = NULL;
+ rcu_data.nexttail = &rcu_data.nextlist;
+#ifdef CONFIG_RCU_STATS
+ rcu_data.n_wait_length += rcu_data.n_next_length;
+ rcu_data.n_wait_add += rcu_data.n_next_length;
+ rcu_data.n_next_length = 0;
+#endif /* #ifdef CONFIG_RCU_STATS */
+ } else {
+ rcu_data.waitlist = NULL;
+ rcu_data.waittail = &rcu_data.waitlist;
+ }
+ rcu_data.completed = rcu_ctrlblk.completed;
+ }
+}
+
+/*
+ * Attempt a single flip of the counters. Remember, a single flip does
+ * -not- constitute a grace period. Instead, the interval between
+ * a pair of consecutive flips is a grace period.
+ *
+ * If anyone is nuts enough to run this CONFIG_PREEMPT_RCU implementation
+ * on a large SMP, they might want to use a hierarchical organization of
+ * the per-CPU-counter pairs.
+ */
+static void rcu_try_flip(void)
+{
+ int cpu;
+ long flipctr;
+ unsigned long oldirq;
+
+ flipctr = rcu_ctrlblk.completed;
+#ifdef CONFIG_RCU_STATS
+ atomic_inc(&rcu_data.n_rcu_try_flip1);
+#endif /* #ifdef CONFIG_RCU_STATS */
+ if (unlikely(!spin_trylock_irqsave(&rcu_ctrlblk.fliplock, oldirq))) {
+#ifdef CONFIG_RCU_STATS
+ atomic_inc(&rcu_data.n_rcu_try_flip_e1);
+#endif /* #ifdef CONFIG_RCU_STATS */
+ return;
+ }
+ if (unlikely(flipctr != rcu_ctrlblk.completed)) {
+
+ /* Our work is done! ;-) */
+
+#ifdef CONFIG_RCU_STATS
+ rcu_data.n_rcu_try_flip_e2++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+ spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
+ return;
+ }
+ flipctr &= 0x1;
+
+ /*
+ * Check for completion of all RCU read-side critical sections
+ * that started prior to the previous flip.
+ */
+
+#ifdef CONFIG_RCU_STATS
+ rcu_data.n_rcu_try_flip2++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+ for_each_possible_cpu(cpu) {
+ if (atomic_read(&per_cpu(rcu_flipctr, cpu)[!flipctr]) != 0) {
+#ifdef CONFIG_RCU_STATS
+ rcu_data.n_rcu_try_flip_e3++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+ spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
+ return;
+ }
+ }
+
+ /* Do the flip. */
+
+ smp_mb();
+ rcu_ctrlblk.completed++;
+
+#ifdef CONFIG_RCU_STATS
+ rcu_data.n_rcu_try_flip3++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+ spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
+}
+
+void rcu_check_callbacks(int cpu, int user)
+{
+ unsigned long oldirq;
+
+ if (rcu_ctrlblk.completed == rcu_data.completed) {
+ rcu_try_flip();
+ if (rcu_ctrlblk.completed == rcu_data.completed) {
+ return;
+ }
+ }
+ spin_lock_irqsave(&rcu_data.lock, oldirq);
+#ifdef CONFIG_RCU_STATS
+ rcu_data.n_rcu_check_callbacks++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+ __rcu_advance_callbacks();
+ if (rcu_data.donelist == NULL) {
+ spin_unlock_irqrestore(&rcu_data.lock, oldirq);
+ } else {
+ spin_unlock_irqrestore(&rcu_data.lock, oldirq);
+ tasklet_schedule(&rcu_data.rcu_tasklet);
+ }
+}
+
+static void rcu_process_callbacks(unsigned long data)
+{
+ unsigned long flags;
+ struct rcu_head *next, *list;
+
+ spin_lock_irqsave(&rcu_data.lock, flags);
+ list = rcu_data.donelist;
+ if (list == NULL) {
+ spin_unlock_irqrestore(&rcu_data.lock, flags);
+ return;
+ }
+ rcu_data.donelist = NULL;
+ rcu_data.donetail = &rcu_data.donelist;
+#ifdef CONFIG_RCU_STATS
+ rcu_data.n_done_remove += rcu_data.n_done_length;
+ rcu_data.n_done_length = 0;
+#endif /* #ifdef CONFIG_RCU_STATS */
+ spin_unlock_irqrestore(&rcu_data.lock, flags);
+ while (list) {
+ next = list->next;
+ list->func(list);
+ list = next;
+#ifdef CONFIG_RCU_STATS
+ atomic_inc(&rcu_data.n_done_invoked);
+#endif /* #ifdef CONFIG_RCU_STATS */
+ }
+}
+
+void fastcall call_rcu(struct rcu_head *head,
+ void (*func)(struct rcu_head *rcu))
+{
+ unsigned long flags;
+
+ head->func = func;
+ head->next = NULL;
+ spin_lock_irqsave(&rcu_data.lock, flags);
+ __rcu_advance_callbacks();
+ *rcu_data.nexttail = head;
+ rcu_data.nexttail = &head->next;
+#ifdef CONFIG_RCU_STATS
+ rcu_data.n_next_add++;
+ rcu_data.n_next_length++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+ spin_unlock_irqrestore(&rcu_data.lock, flags);
+}
+
+/*
+ * Crude hack, reduces but does not eliminate possibility of failure.
+ * Needs to wait for all CPUs to pass through a -voluntary- context
+ * switch to eliminate possibility of failure. (Maybe just crank
+ * priority down...)
+ */
+void __synchronize_sched(void)
+{
+ cpumask_t oldmask;
+ int cpu;
+
+ if (sched_getaffinity(0, &oldmask) < 0) {
+ oldmask = cpu_possible_map;
+ }
+ for_each_online_cpu(cpu) {
+ sched_setaffinity(0, cpumask_of_cpu(cpu));
+ schedule();
+ }
+ sched_setaffinity(0, oldmask);
+}
+
+int rcu_pending(int cpu)
+{
+ return (rcu_data.donelist != NULL ||
+ rcu_data.waitlist != NULL ||
+ rcu_data.nextlist != NULL);
+}
+
+void __init __rcu_init(void)
+{
+/*&&&&*/printk("WARNING: experimental RCU implementation.\n");
+ spin_lock_init(&rcu_data.lock);
+ rcu_data.completed = 0;
+ rcu_data.nextlist = NULL;
+ rcu_data.nexttail = &rcu_data.nextlist;
+ rcu_data.waitlist = NULL;
+ rcu_data.waittail = &rcu_data.waitlist;
+ rcu_data.donelist = NULL;
+ rcu_data.donetail = &rcu_data.donelist;
+ tasklet_init(&rcu_data.rcu_tasklet, rcu_process_callbacks, 0UL);
+}
+
+/*
+ * Deprecated, use synchronize_rcu() or synchronize_sched() instead.
+ */
+void synchronize_kernel(void)
+{
+ synchronize_rcu();
+}
+
+#ifdef CONFIG_RCU_STATS
+int rcu_read_proc_data(char *page)
+{
+ return sprintf(page,
+ "ggp=%ld lgp=%ld rcc=%ld\n"
+ "na=%ld nl=%ld wa=%ld wl=%ld da=%ld dl=%ld dr=%ld di=%d\n"
+ "rtf1=%d rtf2=%ld rtf3=%ld rtfe1=%d rtfe2=%ld rtfe3=%ld\n",
+
+ rcu_ctrlblk.completed,
+ rcu_data.completed,
+ rcu_data.n_rcu_check_callbacks,
+
+ rcu_data.n_next_add,
+ rcu_data.n_next_length,
+ rcu_data.n_wait_add,
+ rcu_data.n_wait_length,
+ rcu_data.n_done_add,
+ rcu_data.n_done_length,
+ rcu_data.n_done_remove,
+ atomic_read(&rcu_data.n_done_invoked),
+
+ atomic_read(&rcu_data.n_rcu_try_flip1),
+ rcu_data.n_rcu_try_flip2,
+ rcu_data.n_rcu_try_flip3,
+ atomic_read(&rcu_data.n_rcu_try_flip_e1),
+ rcu_data.n_rcu_try_flip_e2,
+ rcu_data.n_rcu_try_flip_e3);
+}
+
+int rcu_read_proc_gp_data(char *page)
+{
+ long oldgp = rcu_ctrlblk.completed;
+
+ synchronize_rcu();
+ return sprintf(page, "oldggp=%ld newggp=%ld\n",
+ oldgp, rcu_ctrlblk.completed);
+}
+
+int rcu_read_proc_ptrs_data(char *page)
+{
+ return sprintf(page,
+ "nl=%p/%p nt=%p\n wl=%p/%p wt=%p dl=%p/%p dt=%p\n",
+ &rcu_data.nextlist, rcu_data.nextlist, rcu_data.nexttail,
+ &rcu_data.waitlist, rcu_data.waitlist, rcu_data.waittail,
+ &rcu_data.donelist, rcu_data.donelist, rcu_data.donetail
+ );
+}
+
+int rcu_read_proc_ctrs_data(char *page)
+{
+ int cnt = 0;
+ int cpu;
+ int f = rcu_data.completed & 0x1;
+
+ cnt += sprintf(&page[cnt], "CPU last cur\n");
+ for_each_online_cpu(cpu) {
+ cnt += sprintf(&page[cnt], "%3d %4d %3d\n",
+ cpu,
+ atomic_read(&per_cpu(rcu_flipctr, cpu)[!f]),
+ atomic_read(&per_cpu(rcu_flipctr, cpu)[f]));
+ }
+ cnt += sprintf(&page[cnt], "ggp = %ld\n", rcu_data.completed);
+ return (cnt);
+}
+
+#endif /* #ifdef CONFIG_RCU_STATS */
+
+EXPORT_SYMBOL_GPL(call_rcu);
+EXPORT_SYMBOL_GPL(rcu_batches_completed);
+EXPORT_SYMBOL_GPL(__synchronize_sched);
+EXPORT_SYMBOL_GPL(__rcu_read_lock);
+EXPORT_SYMBOL_GPL(__rcu_read_unlock);
+
diff -puN include/linux/rcupdate.h~rcu-preempt include/linux/rcupdate.h
--- linux-2.6.18-rc3-rcu/include/linux/rcupdate.h~rcu-preempt 2006-08-27 11:42:15.000000000 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/include/linux/rcupdate.h 2006-08-27 11:42:15.000000000 +0530
@@ -41,7 +41,12 @@
#include <linux/percpu.h>
#include <linux/cpumask.h>
#include <linux/seqlock.h>
+#include <linux/config.h>
+#ifdef CONFIG_CLASSIC_RCU
#include <linux/rcuclassic.h>
+#else
+#include <linux/rcupreempt.h>
+#endif

/**
* struct rcu_head - callback structure for use with RCU
diff -puN include/linux/sched.h~rcu-preempt include/linux/sched.h
--- linux-2.6.18-rc3-rcu/include/linux/sched.h~rcu-preempt 2006-08-27 11:42:15.000000000 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/include/linux/sched.h 2006-08-27 11:42:15.000000000 +0530
@@ -795,6 +795,12 @@ struct task_struct {
cpumask_t cpus_allowed;
unsigned int time_slice, first_time_slice;

+#ifdef CONFIG_PREEMPT_RCU
+ int rcu_read_lock_nesting;
+ atomic_t *rcu_flipctr1;
+ atomic_t *rcu_flipctr2;
+#endif
+
#if defined(CONFIG_SCHEDSTATS) || defined(CONFIG_TASK_DELAY_ACCT)
struct sched_info sched_info;
#endif

_

2006-08-28 16:13:22

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [PATCH 4/4] RCU: clean up RCU trace


This patch consolidates the RCU tracing code in the preemptible
RCU implementation, moves them to a separate "trace" file and
cleans up the #ifdefs. Moving to a separate file will eventually
allow dynamic tracing of RCU implementation.

Signed-off-by: Paul McKenney <[email protected]>
Signed-off-by: Dipankar Sarma <[email protected]>
---


include/linux/rcupreempt_trace.h | 84 ++++++++++++++++++++++++++++
kernel/Kconfig.preempt | 11 +--
kernel/Makefile | 1
kernel/rcupreempt.c | 113 ++++++++++++---------------------------
kernel/rcupreempt_trace.c | 99 ++++++++++++++++++++++++++++++++++
5 files changed, 225 insertions(+), 83 deletions(-)

diff -puN /dev/null include/linux/rcupreempt_trace.h
--- /dev/null 2006-08-28 19:57:17.885180500 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/include/linux/rcupreempt_trace.h 2006-08-27 21:52:28.000000000 +0530
@@ -0,0 +1,84 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion (RT implementation)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright (C) IBM Corporation, 2006
+ *
+ * Author: Paul McKenney <[email protected]>
+ *
+ * Based on the original work by Paul McKenney <[email protected]>
+ * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
+ * Papers:
+ * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
+ * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * http://lse.sourceforge.net/locking/rcupdate.html
+ *
+ */
+
+#ifndef __LINUX_RCUPREEMPT_TRACE_H
+#define __LINUX_RCUPREEMPT_TRACE_H
+
+#ifdef __KERNEL__
+#include <linux/types.h>
+#include <linux/kernel.h>
+
+#include <asm/atomic.h>
+
+/*
+ * PREEMPT_RCU data structures.
+ */
+
+struct rcupreempt_trace {
+ long next_length;
+ long next_add;
+ long wait_length;
+ long wait_add;
+ long done_length;
+ long done_add;
+ long done_remove;
+ atomic_t done_invoked;
+ long rcu_check_callbacks;
+ atomic_t rcu_try_flip1;
+ long rcu_try_flip2;
+ long rcu_try_flip3;
+ atomic_t rcu_try_flip_e1;
+ long rcu_try_flip_e2;
+ long rcu_try_flip_e3;
+};
+
+#ifdef CONFIG_RCU_TRACE
+#define RCU_TRACE(fn, arg) fn(arg);
+#else
+#define RCU_TRACE(fn, arg)
+#endif
+
+extern void rcupreempt_trace_move2done(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_move2wait(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_e1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_e2(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_e3(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip2(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip3(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_check_callbacks(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_done_remove(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_invoke(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_next_add(struct rcupreempt_trace *trace);
+
+#endif /* __KERNEL__ */
+#endif /* __LINUX_RCUPREEMPT_TRACE_H */
diff -puN kernel/Kconfig.preempt~rcu-preempt-trace kernel/Kconfig.preempt
--- linux-2.6.18-rc3-rcu/kernel/Kconfig.preempt~rcu-preempt-trace 2006-08-27 21:52:28.000000000 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/kernel/Kconfig.preempt 2006-08-27 21:52:28.000000000 +0530
@@ -90,13 +90,12 @@ config PREEMPT_RCU

endchoice

-config RCU_STATS
- bool "/proc stats for preemptible RCU read-side critical sections"
- depends on PREEMPT_RCU
+config RCU_TRACE
+ bool "Enable tracing for RCU - currently stats in /proc"
default y
help
- This option provides /proc stats to provide debugging info for
- the preemptible realtime RCU implementation.
+ This option provides tracing in RCU which presents /proc
+ stats for debugging RCU implementation.

- Say Y here if you want to see RCU stats in /proc
+ Say Y here if you want to enable RCU tracing
Say N if you are unsure.
diff -puN kernel/Makefile~rcu-preempt-trace kernel/Makefile
--- linux-2.6.18-rc3-rcu/kernel/Makefile~rcu-preempt-trace 2006-08-27 21:52:28.000000000 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/kernel/Makefile 2006-08-27 21:52:28.000000000 +0530
@@ -49,6 +49,7 @@ obj-$(CONFIG_SECCOMP) += seccomp.o
obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o
obj-$(CONFIG_CLASSIC_RCU) += rcupdate.o rcuclassic.o
obj-$(CONFIG_PREEMPT_RCU) += rcupdate.o rcupreempt.o
+obj-$(CONFIG_RCU_TRACE) += rcupreempt_trace.o
obj-$(CONFIG_RELAY) += relay.o
obj-$(CONFIG_TASK_DELAY_ACCT) += delayacct.o
obj-$(CONFIG_TASKSTATS) += taskstats.o
diff -puN kernel/rcupreempt.c~rcu-preempt-trace kernel/rcupreempt.c
--- linux-2.6.18-rc3-rcu/kernel/rcupreempt.c~rcu-preempt-trace 2006-08-27 21:52:28.000000000 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/kernel/rcupreempt.c 2006-08-27 21:52:28.000000000 +0530
@@ -48,6 +48,7 @@
#include <linux/delay.h>
#include <linux/byteorder/swabb.h>
#include <linux/cpumask.h>
+#include <linux/rcupreempt_trace.h>

/*
* PREEMPT_RCU data structures.
@@ -63,23 +64,9 @@ struct rcu_data {
struct rcu_head **waittail;
struct rcu_head *donelist;
struct rcu_head **donetail;
-#ifdef CONFIG_RCU_STATS
- long n_next_length;
- long n_next_add;
- long n_wait_length;
- long n_wait_add;
- long n_done_length;
- long n_done_add;
- long n_done_remove;
- atomic_t n_done_invoked;
- long n_rcu_check_callbacks;
- atomic_t n_rcu_try_flip1;
- long n_rcu_try_flip2;
- long n_rcu_try_flip3;
- atomic_t n_rcu_try_flip_e1;
- long n_rcu_try_flip_e2;
- long n_rcu_try_flip_e3;
-#endif /* #ifdef CONFIG_RCU_STATS */
+#ifdef CONFIG_RCU_TRACE
+ struct rcupreempt_trace trace;
+#endif /* #ifdef CONFIG_RCU_TRACE */
};
struct rcu_ctrlblk {
spinlock_t fliplock;
@@ -180,22 +167,14 @@ static void __rcu_advance_callbacks(void
if (rcu_data.waitlist != NULL) {
*rcu_data.donetail = rcu_data.waitlist;
rcu_data.donetail = rcu_data.waittail;
-#ifdef CONFIG_RCU_STATS
- rcu_data.n_done_length += rcu_data.n_wait_length;
- rcu_data.n_done_add += rcu_data.n_wait_length;
- rcu_data.n_wait_length = 0;
-#endif /* #ifdef CONFIG_RCU_STATS */
+ RCU_TRACE(rcupreempt_trace_move2done, &rcu_data.trace);
}
if (rcu_data.nextlist != NULL) {
rcu_data.waitlist = rcu_data.nextlist;
rcu_data.waittail = rcu_data.nexttail;
rcu_data.nextlist = NULL;
rcu_data.nexttail = &rcu_data.nextlist;
-#ifdef CONFIG_RCU_STATS
- rcu_data.n_wait_length += rcu_data.n_next_length;
- rcu_data.n_wait_add += rcu_data.n_next_length;
- rcu_data.n_next_length = 0;
-#endif /* #ifdef CONFIG_RCU_STATS */
+ RCU_TRACE(rcupreempt_trace_move2wait, &rcu_data.trace);
} else {
rcu_data.waitlist = NULL;
rcu_data.waittail = &rcu_data.waitlist;
@@ -220,22 +199,16 @@ static void rcu_try_flip(void)
unsigned long oldirq;

flipctr = rcu_ctrlblk.completed;
-#ifdef CONFIG_RCU_STATS
- atomic_inc(&rcu_data.n_rcu_try_flip1);
-#endif /* #ifdef CONFIG_RCU_STATS */
+ RCU_TRACE(rcupreempt_trace_try_flip1, &rcu_data.trace);
if (unlikely(!spin_trylock_irqsave(&rcu_ctrlblk.fliplock, oldirq))) {
-#ifdef CONFIG_RCU_STATS
- atomic_inc(&rcu_data.n_rcu_try_flip_e1);
-#endif /* #ifdef CONFIG_RCU_STATS */
+ RCU_TRACE(rcupreempt_trace_try_flip_e1, &rcu_data.trace);
return;
}
if (unlikely(flipctr != rcu_ctrlblk.completed)) {

/* Our work is done! ;-) */

-#ifdef CONFIG_RCU_STATS
- rcu_data.n_rcu_try_flip_e2++;
-#endif /* #ifdef CONFIG_RCU_STATS */
+ RCU_TRACE(rcupreempt_trace_try_flip_e2, &rcu_data.trace);
spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
return;
}
@@ -246,14 +219,11 @@ static void rcu_try_flip(void)
* that started prior to the previous flip.
*/

-#ifdef CONFIG_RCU_STATS
- rcu_data.n_rcu_try_flip2++;
-#endif /* #ifdef CONFIG_RCU_STATS */
+ RCU_TRACE(rcupreempt_trace_try_flip2, &rcu_data.trace);
for_each_possible_cpu(cpu) {
if (atomic_read(&per_cpu(rcu_flipctr, cpu)[!flipctr]) != 0) {
-#ifdef CONFIG_RCU_STATS
- rcu_data.n_rcu_try_flip_e3++;
-#endif /* #ifdef CONFIG_RCU_STATS */
+ RCU_TRACE(rcupreempt_trace_try_flip_e3,
+ &rcu_data.trace);
spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
return;
}
@@ -264,9 +234,7 @@ static void rcu_try_flip(void)
smp_mb();
rcu_ctrlblk.completed++;

-#ifdef CONFIG_RCU_STATS
- rcu_data.n_rcu_try_flip3++;
-#endif /* #ifdef CONFIG_RCU_STATS */
+ RCU_TRACE(rcupreempt_trace_try_flip3, &rcu_data.trace);
spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
}

@@ -281,9 +249,7 @@ void rcu_check_callbacks(int cpu, int us
}
}
spin_lock_irqsave(&rcu_data.lock, oldirq);
-#ifdef CONFIG_RCU_STATS
- rcu_data.n_rcu_check_callbacks++;
-#endif /* #ifdef CONFIG_RCU_STATS */
+ RCU_TRACE(rcupreempt_trace_check_callbacks, &rcu_data.trace);
__rcu_advance_callbacks();
if (rcu_data.donelist == NULL) {
spin_unlock_irqrestore(&rcu_data.lock, oldirq);
@@ -306,18 +272,13 @@ static void rcu_process_callbacks(unsign
}
rcu_data.donelist = NULL;
rcu_data.donetail = &rcu_data.donelist;
-#ifdef CONFIG_RCU_STATS
- rcu_data.n_done_remove += rcu_data.n_done_length;
- rcu_data.n_done_length = 0;
-#endif /* #ifdef CONFIG_RCU_STATS */
+ RCU_TRACE(rcupreempt_trace_done_remove, &rcu_data.trace);
spin_unlock_irqrestore(&rcu_data.lock, flags);
while (list) {
next = list->next;
list->func(list);
list = next;
-#ifdef CONFIG_RCU_STATS
- atomic_inc(&rcu_data.n_done_invoked);
-#endif /* #ifdef CONFIG_RCU_STATS */
+ RCU_TRACE(rcupreempt_trace_invoke, &rcu_data.trace);
}
}

@@ -332,10 +293,7 @@ void fastcall call_rcu(struct rcu_head *
__rcu_advance_callbacks();
*rcu_data.nexttail = head;
rcu_data.nexttail = &head->next;
-#ifdef CONFIG_RCU_STATS
- rcu_data.n_next_add++;
- rcu_data.n_next_length++;
-#endif /* #ifdef CONFIG_RCU_STATS */
+ RCU_TRACE(rcupreempt_trace_next_add, &rcu_data.trace);
spin_unlock_irqrestore(&rcu_data.lock, flags);
}

@@ -389,9 +347,10 @@ void synchronize_kernel(void)
synchronize_rcu();
}

-#ifdef CONFIG_RCU_STATS
+#ifdef CONFIG_RCU_TRACE
int rcu_read_proc_data(char *page)
{
+ struct rcupreempt_trace *trace = &rcu_data.trace;
return sprintf(page,
"ggp=%ld lgp=%ld rcc=%ld\n"
"na=%ld nl=%ld wa=%ld wl=%ld da=%ld dl=%ld dr=%ld di=%d\n"
@@ -399,23 +358,23 @@ int rcu_read_proc_data(char *page)

rcu_ctrlblk.completed,
rcu_data.completed,
- rcu_data.n_rcu_check_callbacks,
+ trace->rcu_check_callbacks,

- rcu_data.n_next_add,
- rcu_data.n_next_length,
- rcu_data.n_wait_add,
- rcu_data.n_wait_length,
- rcu_data.n_done_add,
- rcu_data.n_done_length,
- rcu_data.n_done_remove,
- atomic_read(&rcu_data.n_done_invoked),
-
- atomic_read(&rcu_data.n_rcu_try_flip1),
- rcu_data.n_rcu_try_flip2,
- rcu_data.n_rcu_try_flip3,
- atomic_read(&rcu_data.n_rcu_try_flip_e1),
- rcu_data.n_rcu_try_flip_e2,
- rcu_data.n_rcu_try_flip_e3);
+ trace->next_add,
+ trace->next_length,
+ trace->wait_add,
+ trace->wait_length,
+ trace->done_add,
+ trace->done_length,
+ trace->done_remove,
+ atomic_read(&trace->done_invoked),
+
+ atomic_read(&trace->rcu_try_flip1),
+ trace->rcu_try_flip2,
+ trace->rcu_try_flip3,
+ atomic_read(&trace->rcu_try_flip_e1),
+ trace->rcu_try_flip_e2,
+ trace->rcu_try_flip_e3);
}

int rcu_read_proc_gp_data(char *page)
@@ -454,7 +413,7 @@ int rcu_read_proc_ctrs_data(char *page)
return (cnt);
}

-#endif /* #ifdef CONFIG_RCU_STATS */
+#endif /* #ifdef CONFIG_RCU_TRACE */

EXPORT_SYMBOL_GPL(call_rcu);
EXPORT_SYMBOL_GPL(rcu_batches_completed);
diff -puN /dev/null kernel/rcupreempt_trace.c
--- /dev/null 2006-08-28 19:57:17.885180500 +0530
+++ linux-2.6.18-rc3-rcu-dipankar/kernel/rcupreempt_trace.c 2006-08-27 21:52:28.000000000 +0530
@@ -0,0 +1,99 @@
+/*
+ * Read-Copy Update tracing for realtime implementation
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright (C) IBM Corporation, 2006
+ *
+ * Papers: http://www.rdrop.com/users/paulmck/RCU
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * Documentation/RCU/ *.txt
+ *
+ */
+#include <linux/types.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/spinlock.h>
+#include <linux/smp.h>
+#include <linux/rcupdate.h>
+#include <linux/interrupt.h>
+#include <linux/sched.h>
+#include <asm/atomic.h>
+#include <linux/bitops.h>
+#include <linux/module.h>
+#include <linux/completion.h>
+#include <linux/moduleparam.h>
+#include <linux/percpu.h>
+#include <linux/notifier.h>
+#include <linux/rcupdate.h>
+#include <linux/cpu.h>
+#include <linux/mutex.h>
+#include <linux/rcupreempt_trace.h>
+
+void rcupreempt_trace_move2done(struct rcupreempt_trace *trace)
+{
+ trace->done_length += trace->wait_length;
+ trace->done_add += trace->wait_length;
+ trace->wait_length = 0;
+}
+void rcupreempt_trace_move2wait(struct rcupreempt_trace *trace)
+{
+ trace->wait_length += trace->next_length;
+ trace->wait_add += trace->next_length;
+ trace->next_length = 0;
+}
+void rcupreempt_trace_try_flip1(struct rcupreempt_trace *trace)
+{
+ atomic_inc(&trace->rcu_try_flip1);
+}
+void rcupreempt_trace_try_flip_e1(struct rcupreempt_trace *trace)
+{
+ atomic_inc(&trace->rcu_try_flip_e1);
+}
+void rcupreempt_trace_try_flip_e2(struct rcupreempt_trace *trace)
+{
+ trace->rcu_try_flip_e2++;
+}
+void rcupreempt_trace_try_flip_e3(struct rcupreempt_trace *trace)
+{
+ trace->rcu_try_flip_e3++;
+}
+void rcupreempt_trace_try_flip2(struct rcupreempt_trace *trace)
+{
+ trace->rcu_try_flip2++;
+}
+void rcupreempt_trace_try_flip3(struct rcupreempt_trace *trace)
+{
+ trace->rcu_try_flip3++;
+}
+void rcupreempt_trace_check_callbacks(struct rcupreempt_trace *trace)
+{
+ trace->rcu_check_callbacks++;
+}
+void rcupreempt_trace_done_remove(struct rcupreempt_trace *trace)
+{
+ trace->done_remove += trace->done_length;
+ trace->done_length = 0;
+}
+void rcupreempt_trace_invoke(struct rcupreempt_trace *trace)
+{
+ atomic_inc(&trace->done_invoked);
+}
+void rcupreempt_trace_next_add(struct rcupreempt_trace *trace)
+{
+ trace->next_add++;
+ trace->next_length++;
+}

_

2006-08-28 16:16:09

by Arjan van de Ven

[permalink] [raw]
Subject: Re: [PATCH 0/4] RCU: various merge candidates

On Mon, 2006-08-28 at 21:38 +0530, Dipankar Sarma wrote:
> This patchset consists of various merge candidates that would
> do well to have some testing in -mm. This patchset breaks
> out RCU implementation from its APIs to allow multiple
> implementations,

Hi,


can you explain why we would want multiple RCU implementations?
Isn't one going to be plenty already?

Greetings,
Arjan van de Ven
--
if you want to mail me at work (you don't), use arjan (at) linux.intel.com

2006-08-28 16:28:57

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [PATCH 0/4] RCU: various merge candidates

On Mon, Aug 28, 2006 at 06:15:48PM +0200, Arjan van de Ven wrote:
> On Mon, 2006-08-28 at 21:38 +0530, Dipankar Sarma wrote:
> > This patchset consists of various merge candidates that would
> > do well to have some testing in -mm. This patchset breaks
> > out RCU implementation from its APIs to allow multiple
> > implementations,
>
>
> can you explain why we would want multiple RCU implementations?
> Isn't one going to be plenty already?

Hi Arjan,

See this for a background - http://lwn.net/Articles/129511/

Primarily, rcupreempt allows read-side critical sections to
be preempted unline classic RCU currently in mainline. It is
also a bit more aggressive in terms of grace periods by counting
the number of readers as opposed to periodic checks in classic
RCU.

The hope is that it will help mainline users who look for
better latency.

Thanks
Dipankar

2006-08-28 16:33:27

by Arjan van de Ven

[permalink] [raw]
Subject: Re: [PATCH 0/4] RCU: various merge candidates

On Mon, 2006-08-28 at 21:59 +0530, Dipankar Sarma wrote:
> On Mon, Aug 28, 2006 at 06:15:48PM +0200, Arjan van de Ven wrote:
> > On Mon, 2006-08-28 at 21:38 +0530, Dipankar Sarma wrote:
> > > This patchset consists of various merge candidates that would
> > > do well to have some testing in -mm. This patchset breaks
> > > out RCU implementation from its APIs to allow multiple
> > > implementations,
> >
> >
> > can you explain why we would want multiple RCU implementations?
> > Isn't one going to be plenty already?
>
> Hi Arjan,
>
> See this for a background - http://lwn.net/Articles/129511/
>
> Primarily, rcupreempt allows read-side critical sections to
> be preempted unline classic RCU currently in mainline. It is
> also a bit more aggressive in terms of grace periods by counting
> the number of readers as opposed to periodic checks in classic
> RCU.
>

hi,

thanks for the explenation, this for sure explains one half of the
equation; the other half is ... "why do we not always want this"?

Greetings,
Arjan van de Ven

2006-08-28 16:43:41

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [PATCH 0/4] RCU: various merge candidates

On Mon, Aug 28, 2006 at 06:33:09PM +0200, Arjan van de Ven wrote:
> On Mon, 2006-08-28 at 21:59 +0530, Dipankar Sarma wrote:
> > On Mon, Aug 28, 2006 at 06:15:48PM +0200, Arjan van de Ven wrote:
> > > On Mon, 2006-08-28 at 21:38 +0530, Dipankar Sarma wrote:
> > Hi Arjan,
> >
> > See this for a background - http://lwn.net/Articles/129511/
> >
> > Primarily, rcupreempt allows read-side critical sections to
> > be preempted unline classic RCU currently in mainline. It is
> > also a bit more aggressive in terms of grace periods by counting
> > the number of readers as opposed to periodic checks in classic
> > RCU.
> >
>
> hi,
>
> thanks for the explenation, this for sure explains one half of the
> equation; the other half is ... "why do we not always want this"?

It comes with read-side overheads for keeping track
of critical sections and we need to carefully
check its impact on performance over a more wide variety
of workload before deciding to switch the default.

See table 2 of page 10 in this paper -

http://www.rdrop.com/users/paulmck/RCU/OLSrtRCU.2006.08.11a.pdf

Thanks
Dipankar

2006-08-28 19:06:25

by Andrew Morton

[permalink] [raw]
Subject: Re: [PATCH 0/4] RCU: various merge candidates

On Mon, 28 Aug 2006 21:38:45 +0530
Dipankar Sarma <[email protected]> wrote:

> This patchset consists of various merge candidates that would
> do well to have some testing in -mm. This patchset breaks
> out RCU implementation from its APIs to allow multiple
> implementations, gives RCU its own softirq and finally
> lines up preemptible RCU from -rt tree as a configurable
> RCU implementation for mainline.
>
> All comments and testing is welcome. RFC at the moment, but
> I can later submit patches against -mm, Andrew, if you want.
> They have been tested lightly using dbench, kernbench and ltp
> (both CONFIG_CLASSIC_RCU=y and n) on x86 and ppc64.

ouch.

akpm:/usr/src/25> grep rcu series
radix-tree-rcu-lockless-readside.patch
adix-tree-rcu-lockless-readside-update.patch
radix-tree-rcu-lockless-readside-semicolon.patch
adix-tree-rcu-lockless-readside-update-tidy.patch
adix-tree-rcu-lockless-readside-fix-2.patch
adix-tree-rcu-lockless-readside-fix-3.patch
rcu-add-lock-annotations-to-rcu_bh_torture_read_lockunlock.patch
srcu-3-rcu-variant-permitting-read-side-blocking.patch
srcu-3-rcu-variant-permitting-read-side-blocking-fix.patch
srcu-3-rcu-variant-permitting-read-side-blocking-srcu-add-lock-annotations.patch
srcu-3-add-srcu-operations-to-rcutorture.patch
srcu-3-add-srcu-operations-to-rcutorture-fix.patch
add-srcu-based-notifier-chains.patch
add-srcu-based-notifier-chains-cleanup.patch
srcu-report-out-of-memory-errors.patch
srcu-report-out-of-memory-errors-fixlet.patch
cpufreq-make-the-transition_notifier-chain-use-srcu.patch
rcu-add-module_author-to-rcutorture-module.patch
rcu-fix-incorrect-description-of-default-for-rcutorture.patch
rcu-mention-rcu_bh-in-description-of-rcutortures.patch
rcu-avoid-kthread_stop-on-invalid-pointer-if-rcutorture.patch
rcu-fix-sign-bug-making-rcu_random-always-return-the-same.patch
rcu-add-fake-writers-to-rcutorture.patch
rcu-add-fake-writers-to-rcutorture-tidy.patch

Now what?

2006-08-28 19:16:37

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [PATCH 0/4] RCU: various merge candidates

On Mon, Aug 28, 2006 at 12:06:11PM -0700, Andrew Morton wrote:
> On Mon, 28 Aug 2006 21:38:45 +0530
> Dipankar Sarma <[email protected]> wrote:
>
> > This patchset consists of various merge candidates that would
> > do well to have some testing in -mm. This patchset breaks
> > out RCU implementation from its APIs to allow multiple
> > implementations, gives RCU its own softirq and finally
> > lines up preemptible RCU from -rt tree as a configurable
> > RCU implementation for mainline.
> >
> > All comments and testing is welcome. RFC at the moment, but
> > I can later submit patches against -mm, Andrew, if you want.
> > They have been tested lightly using dbench, kernbench and ltp
> > (both CONFIG_CLASSIC_RCU=y and n) on x86 and ppc64.
>
> ouch.
>
> akpm:/usr/src/25> grep rcu series
> radix-tree-rcu-lockless-readside.patch
> adix-tree-rcu-lockless-readside-update.patch
> radix-tree-rcu-lockless-readside-semicolon.patch
> adix-tree-rcu-lockless-readside-update-tidy.patch
> adix-tree-rcu-lockless-readside-fix-2.patch
> adix-tree-rcu-lockless-readside-fix-3.patch

Not related to RCU implementation.

> rcu-add-lock-annotations-to-rcu_bh_torture_read_lockunlock.patch

rcutorture (test module) patch independent of the implementation changes.

> srcu-3-rcu-variant-permitting-read-side-blocking.patch
> srcu-3-rcu-variant-permitting-read-side-blocking-fix.patch
> srcu-3-rcu-variant-permitting-read-side-blocking-srcu-add-lock-annotations.patch
> srcu-3-add-srcu-operations-to-rcutorture.patch
> srcu-3-add-srcu-operations-to-rcutorture-fix.patch
> add-srcu-based-notifier-chains.patch
> add-srcu-based-notifier-chains-cleanup.patch
> srcu-report-out-of-memory-errors.patch
> srcu-report-out-of-memory-errors-fixlet.patch
> cpufreq-make-the-transition_notifier-chain-use-srcu.patch

srcu (sleepable rcu) patches independent of the core RCU implementation
changes in the patchset. You can queue these up either before
or after srcu.


> rcu-add-module_author-to-rcutorture-module.patch
> rcu-fix-incorrect-description-of-default-for-rcutorture.patch
> rcu-mention-rcu_bh-in-description-of-rcutortures.patch
> rcu-avoid-kthread_stop-on-invalid-pointer-if-rcutorture.patch
> rcu-fix-sign-bug-making-rcu_random-always-return-the-same.patch
> rcu-add-fake-writers-to-rcutorture.patch
> rcu-add-fake-writers-to-rcutorture-tidy.patch

rcutorture fix patches independent of rcu implementation changes
in this patchset.

>
> Now what?

Heh. I can always re-submit against -mm after I wait for a day or two
for comments :) Or I can wait. I think rcutorture patches are
fairly safe to merge and should go in soon. srcu and the patchset
I mailed today should probably get more testing in -mm before
going in.

Thanks
Dipankar

Thanks
Dipankar

2006-08-28 19:41:19

by Andrew Morton

[permalink] [raw]
Subject: Re: [PATCH 0/4] RCU: various merge candidates

On Tue, 29 Aug 2006 00:46:42 +0530
Dipankar Sarma <[email protected]> wrote:

> srcu (sleepable rcu) patches independent of the core RCU implementation
> changes in the patchset. You can queue these up either before
> or after srcu.
>
> ...
>
> rcutorture fix patches independent of rcu implementation changes
> in this patchset.

So this patchset is largely orthogonal to the presently-queued stuff?

> >
> > Now what?
>
> Heh. I can always re-submit against -mm after I wait for a day or two
> for comments :)

That would be good, thanks. We were seriously considering merging all the
SRCU stuff for 2.6.18, because
cpufreq-make-the-transition_notifier-chain-use-srcu.patch fixes a cpufreq
down()-in-irq-disabled warning at suspend time.

But that's a lot of new stuff just to fix a warning about something which
won't actually cause any misbehaviour. We could just as well do

if (irqs_disabled())
down_read_trylock(...); /* suspend */
else
down_read(...);

in cpufreq to temporarily shut the thing up.


2006-08-28 20:46:35

by Christoph Hellwig

[permalink] [raw]
Subject: Re: [PATCH 3/4] RCU: preemptible RCU implementation

On Mon, Aug 28, 2006 at 09:42:22PM +0530, Dipankar Sarma wrote:
> From: Paul McKenney <[email protected]>
>
> This patch implements a new version of RCU which allows its read-side
> critical sections to be preempted. It uses a set of counter pairs
> to keep track of the read-side critical sections and flips them
> when all tasks exit read-side critical section. The details
> of this implementation can be found in this paper -
>
> http://www.rdrop.com/users/paulmck/RCU/OLSrtRCU.2006.08.11a.pdf
>
> This patch was developed as a part of the -rt kernel
> development and meant to provide better latencies when
> read-side critical sections of RCU don't disable preemption.
> As a consequence of keeping track of RCU readers, the readers
> have a slight overhead (optimizations in the paper).
> This implementation co-exists with the "classic" RCU
> implementations and can be switched to at compiler.

NACK. While a readers can sleep rcu version definitly has it's
we should make it all or nothing. Either we always gurantee that
a rcu reader can sleep or never without external patches. Having
this a config option is the ultimate defeat for any kind of bug
reproducabilility.

Please make the patch undconditional and see if it doesn't cause
any significant slowdowns in production-like scenaries and then
we can switch over to the readers can sleep variant unconditionally
at some point.

2006-08-29 00:22:55

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [PATCH 0/4] RCU: various merge candidates

On Mon, Aug 28, 2006 at 12:40:58PM -0700, Andrew Morton wrote:
> On Tue, 29 Aug 2006 00:46:42 +0530
> Dipankar Sarma <[email protected]> wrote:
>
> >
> > rcutorture fix patches independent of rcu implementation changes
> > in this patchset.
>
> So this patchset is largely orthogonal to the presently-queued stuff?

Yes, it should be.

> > > Now what?
> >
> > Heh. I can always re-submit against -mm after I wait for a day or two
> > for comments :)
>
> That would be good, thanks. We were seriously considering merging all the
> SRCU stuff for 2.6.18, because

I think non-srcu rcutorture patches can be merged in 2.6.19. srcu
is a tossup. Perhaps srcu and this patchset may be merge candidates
for 2.6.20 should things go well in review and testing. Should I re-submit
against 2.6.18-mm1 or so (after your patchset reduces in size) ?
What is a convenient time ?

> cpufreq-make-the-transition_notifier-chain-use-srcu.patch fixes a cpufreq
> down()-in-irq-disabled warning at suspend time.
>
> But that's a lot of new stuff just to fix a warning about something which
> won't actually cause any misbehaviour. We could just as well do
>
> if (irqs_disabled())
> down_read_trylock(...); /* suspend */
> else
> down_read(...);
>
> in cpufreq to temporarily shut the thing up.

GAh! cpufreq. Already I am having to look at all of cpufreq and the
cpufreq drivers, change notifiers for the whole locking model for
the other (hotplug) cleanup. I will keep this in mind.

Thanks
Dipankar

2006-08-29 00:28:42

by Andrew Morton

[permalink] [raw]
Subject: Re: [PATCH 0/4] RCU: various merge candidates

On Tue, 29 Aug 2006 05:53:02 +0530
Dipankar Sarma <[email protected]> wrote:

> > >
> > > rcutorture fix patches independent of rcu implementation changes
> > > in this patchset.
> >
> > So this patchset is largely orthogonal to the presently-queued stuff?
>
> Yes, it should be.

OK.

> > > > Now what?
> > >
> > > Heh. I can always re-submit against -mm after I wait for a day or two
> > > for comments :)
> >
> > That would be good, thanks. We were seriously considering merging all the
> > SRCU stuff for 2.6.18, because
>
> I think non-srcu rcutorture patches can be merged in 2.6.19. srcu
> is a tossup. Perhaps srcu and this patchset may be merge candidates
> for 2.6.20 should things go well in review and testing.

Oh. I was planning on merging *rcu* into 2.6.19-rc1.

> Should I re-submit
> against 2.6.18-mm1 or so (after your patchset reduces in size) ?
> What is a convenient time ?

Any time..

> GAh! cpufreq.

heh.

2006-08-29 01:33:10

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [PATCH 3/4] RCU: preemptible RCU implementation

On Mon, Aug 28, 2006 at 09:46:11PM +0100, Christoph Hellwig wrote:
> On Mon, Aug 28, 2006 at 09:42:22PM +0530, Dipankar Sarma wrote:
> > From: Paul McKenney <[email protected]>
> > http://www.rdrop.com/users/paulmck/RCU/OLSrtRCU.2006.08.11a.pdf
> >
> > This patch was developed as a part of the -rt kernel
> > development and meant to provide better latencies when
> > read-side critical sections of RCU don't disable preemption.
> > As a consequence of keeping track of RCU readers, the readers
> > have a slight overhead (optimizations in the paper).
> > This implementation co-exists with the "classic" RCU
> > implementations and can be switched to at compiler.
>
> NACK. While a readers can sleep rcu version definitly has it's
> we should make it all or nothing. Either we always gurantee that
> a rcu reader can sleep or never without external patches. Having
> this a config option is the ultimate defeat for any kind of bug
> reproducabilility.

Good point. RCU users that want to sleep in the read-side
critical sections should be using *srcu APIs* which are separate
from RCU APIs - srcu_read_lock(), srcu_read_unlock(), synchronize_srcu().
I think of CONFIG_PREEMPT_RCU as similar to CONFIG_PREEMPT where
preemption is allowed in certain sections in the kernel code.
This makes even more sense once CONFIG_PREEMPT_RT is in
mainline in some form. I should perhaps put in explicit checks
to disallow people from sleeping in RCU read-side
critical sections.

> Please make the patch undconditional and see if it doesn't cause
> any significant slowdowns in production-like scenaries and then
> we can switch over to the readers can sleep variant unconditionally
> at some point.

It is still some way from getting there. It needs per-cpu callback
queues for which I am working on a patch. It also needs some
more of Paul's work to reduce read-side overheads. However,
it is reasonably useful in low-end SMP systems for workloads
requiring better scheduling latencies, so I see no reason
not to provide this for CONFIG_PREEMPT users. Besides,
this is one step forward towards merging "crazy" stuff from
-rt :)

Thanks
Dipankar

2006-08-30 00:40:15

by Paul E. McKenney

[permalink] [raw]
Subject: Re: [PATCH 0/4] RCU: various merge candidates

On Mon, Aug 28, 2006 at 12:40:58PM -0700, Andrew Morton wrote:
> On Tue, 29 Aug 2006 00:46:42 +0530
> Dipankar Sarma <[email protected]> wrote:
>
> > srcu (sleepable rcu) patches independent of the core RCU implementation
> > changes in the patchset. You can queue these up either before
> > or after srcu.
> >
> > ...
> >
> > rcutorture fix patches independent of rcu implementation changes
> > in this patchset.
>
> So this patchset is largely orthogonal to the presently-queued stuff?
>
> > >
> > > Now what?
> >
> > Heh. I can always re-submit against -mm after I wait for a day or two
> > for comments :)
>
> That would be good, thanks. We were seriously considering merging all the
> SRCU stuff for 2.6.18, because
> cpufreq-make-the-transition_notifier-chain-use-srcu.patch fixes a cpufreq
> down()-in-irq-disabled warning at suspend time.
>
> But that's a lot of new stuff just to fix a warning about something which
> won't actually cause any misbehaviour. We could just as well do
>
> if (irqs_disabled())
> down_read_trylock(...); /* suspend */
> else
> down_read(...);
>
> in cpufreq to temporarily shut the thing up.

I re-reviewed SRCU and found no issues. So I am OK with it going upstream
if it is useful.

I do have a comment patch below to flag an "attractive nuisance".
Several people have asked about moving the final synchronize_sched()
out of the critical section, but this turns out to be not just scary,
but actually unsafe. ;-)

Again, this patch just adds verbiage to an existing comment.

Signed-off-by: Paul E. McKenney <[email protected]>
---

diff -urpNa -X dontdiff linux-2.6.18-rc2-mm1/kernel/srcu.c linux-2.6.18-rc2-mm1-srcu-comment/kernel/srcu.c
--- linux-2.6.18-rc2-mm1/kernel/srcu.c 2006-08-05 16:30:19.000000000 -0700
+++ linux-2.6.18-rc2-mm1-srcu-comment/kernel/srcu.c 2006-08-29 17:29:30.000000000 -0700
@@ -212,6 +212,25 @@ void synchronize_srcu(struct srcu_struct
* More importantly, it also forces the corresponding SRCU read-side
* critical sections to have also completed, and the corresponding
* references to SRCU-protected data items to be dropped.
+ *
+ * Note:
+ *
+ * Despite what you might think at first glance, the
+ * preceding synchronize_sched() -must- be within the
+ * critical section ended by the following mutex_unlock().
+ * Otherwise, a task taking the early exit can race
+ * with a srcu_read_unlock(), which might have executed
+ * just before the preceding srcu_readers_active() check,
+ * and whose CPU might have reordered the srcu_read_unlock()
+ * with the preceding critical section. In this case, there
+ * is nothing preventing the synchronize_sched() task that is
+ * taking the early exit from freeing a data structure that
+ * is still being referenced (out of order) by the task
+ * doing the srcu_read_unlock().
+ *
+ * Alternatively, the comparison with "2" on the early exit
+ * could be changed to "3", but this increases synchronize_srcu()
+ * latency for bulk loads. So the current code is preferred.
*/

mutex_unlock(&sp->mutex);

2006-08-31 01:12:11

by Paul E. McKenney

[permalink] [raw]
Subject: Re: [PATCH 1/4] RCU: split classic rcu

On Mon, Aug 28, 2006 at 09:40:11PM +0530, Dipankar Sarma wrote:
>
> This patch re-organizes the RCU code to enable multiple implementations
> of RCU. Users of RCU continues to include rcupdate.h and the
> RCU interfaces remain the same. This is in preparation for
> subsequently merging the preepmtpible RCU implementation.

Acked-by: Paul E. McKenney <[email protected]>

> Signed-off-by: Dipankar Sarma <[email protected]>
> ---
>
>
>
>
> include/linux/rcuclassic.h | 149 +++++++++++
> include/linux/rcupdate.h | 153 +++---------
> kernel/Makefile | 2
> kernel/rcuclassic.c | 558 ++++++++++++++++++++++++++++++++++++++++++++
> kernel/rcupdate.c | 559 ++-------------------------------------------
> 5 files changed, 781 insertions(+), 640 deletions(-)
>
> diff -puN /dev/null include/linux/rcuclassic.h
> --- /dev/null 2006-08-26 20:47:46.475534750 +0530
> +++ linux-2.6.18-rc3-rcu-dipankar/include/linux/rcuclassic.h 2006-08-27 00:52:40.000000000 +0530
> @@ -0,0 +1,149 @@
> +/*
> + * Read-Copy Update mechanism for mutual exclusion (classic version)
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
> + *
> + * Copyright (C) IBM Corporation, 2001
> + *
> + * Author: Dipankar Sarma <[email protected]>
> + *
> + * Based on the original work by Paul McKenney <[email protected]>
> + * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
> + * Papers:
> + * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
> + * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
> + *
> + * For detailed explanation of Read-Copy Update mechanism see -
> + * http://lse.sourceforge.net/locking/rcupdate.html
> + *
> + */
> +
> +#ifndef __LINUX_RCUCLASSIC_H
> +#define __LINUX_RCUCLASSIC_H
> +
> +#ifdef __KERNEL__
> +
> +#include <linux/cache.h>
> +#include <linux/spinlock.h>
> +#include <linux/threads.h>
> +#include <linux/percpu.h>
> +#include <linux/cpumask.h>
> +#include <linux/seqlock.h>
> +
> +
> +/* Global control variables for rcupdate callback mechanism. */
> +struct rcu_ctrlblk {
> + long cur; /* Current batch number. */
> + long completed; /* Number of the last completed batch */
> + int next_pending; /* Is the next batch already waiting? */
> +
> + spinlock_t lock ____cacheline_internodealigned_in_smp;
> + cpumask_t cpumask; /* CPUs that need to switch in order */
> + /* for current batch to proceed. */
> +} ____cacheline_internodealigned_in_smp;
> +
> +/* Is batch a before batch b ? */
> +static inline int rcu_batch_before(long a, long b)
> +{
> + return (a - b) < 0;
> +}
> +
> +/* Is batch a after batch b ? */
> +static inline int rcu_batch_after(long a, long b)
> +{
> + return (a - b) > 0;
> +}
> +
> +/*
> + * Per-CPU data for Read-Copy UPdate.
> + * nxtlist - new callbacks are added here
> + * curlist - current batch for which quiescent cycle started if any
> + */
> +struct rcu_data {
> + /* 1) quiescent state handling : */
> + long quiescbatch; /* Batch # for grace period */
> + int passed_quiesc; /* User-mode/idle loop etc. */
> + int qs_pending; /* core waits for quiesc state */
> +
> + /* 2) batch handling */
> + long batch; /* Batch # for current RCU batch */
> + struct rcu_head *nxtlist;
> + struct rcu_head **nxttail;
> + long qlen; /* # of queued callbacks */
> + struct rcu_head *curlist;
> + struct rcu_head **curtail;
> + struct rcu_head *donelist;
> + struct rcu_head **donetail;
> + long blimit; /* Upper limit on a processed batch */
> + int cpu;
> +#ifdef CONFIG_SMP
> + long last_rs_qlen; /* qlen during the last resched */
> +#endif
> +};
> +
> +DECLARE_PER_CPU(struct rcu_data, rcu_data);
> +DECLARE_PER_CPU(struct rcu_data, rcu_bh_data);
> +
> +/*
> + * Increment the quiescent state counter.
> + * The counter is a bit degenerated: We do not need to know
> + * how many quiescent states passed, just if there was at least
> + * one since the start of the grace period. Thus just a flag.
> + */
> +static inline void rcu_qsctr_inc(int cpu)
> +{
> + struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> + rdp->passed_quiesc = 1;
> +}
> +static inline void rcu_bh_qsctr_inc(int cpu)
> +{
> + struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
> + rdp->passed_quiesc = 1;
> +}
> +
> +extern int rcu_pending(int cpu);
> +extern int rcu_needs_cpu(int cpu);
> +
> +#define __rcu_read_lock() \
> + do { \
> + preempt_disable(); \
> + __acquire(RCU); \
> + } while(0)
> +#define __rcu_read_unlock() \
> + do { \
> + __release(RCU); \
> + preempt_enable(); \
> + } while(0)
> +
> +#define __rcu_read_lock_bh() \
> + do { \
> + local_bh_disable(); \
> + __acquire(RCU_BH); \
> + } while(0)
> +#define __rcu_read_unlock_bh() \
> + do { \
> + __release(RCU_BH); \
> + local_bh_enable(); \
> + } while(0)
> +
> +#define __synchronize_sched() synchronize_rcu()
> +
> +extern void __rcu_init(void);
> +extern void rcu_check_callbacks(int cpu, int user);
> +extern void rcu_restart_cpu(int cpu);
> +extern long rcu_batches_completed(void);
> +
> +#endif /* __KERNEL__ */
> +#endif /* __LINUX_RCUCLASSIC_H */
> diff -puN include/linux/rcupdate.h~rcu-split-classic include/linux/rcupdate.h
> --- linux-2.6.18-rc3-rcu/include/linux/rcupdate.h~rcu-split-classic 2006-08-06 03:07:10.000000000 +0530
> +++ linux-2.6.18-rc3-rcu-dipankar/include/linux/rcupdate.h 2006-08-27 00:48:50.000000000 +0530
> @@ -41,6 +41,7 @@
> #include <linux/percpu.h>
> #include <linux/cpumask.h>
> #include <linux/seqlock.h>
> +#include <linux/rcuclassic.h>
>
> /**
> * struct rcu_head - callback structure for use with RCU
> @@ -59,81 +60,6 @@ struct rcu_head {
> } while (0)
>
>
> -
> -/* Global control variables for rcupdate callback mechanism. */
> -struct rcu_ctrlblk {
> - long cur; /* Current batch number. */
> - long completed; /* Number of the last completed batch */
> - int next_pending; /* Is the next batch already waiting? */
> -
> - spinlock_t lock ____cacheline_internodealigned_in_smp;
> - cpumask_t cpumask; /* CPUs that need to switch in order */
> - /* for current batch to proceed. */
> -} ____cacheline_internodealigned_in_smp;
> -
> -/* Is batch a before batch b ? */
> -static inline int rcu_batch_before(long a, long b)
> -{
> - return (a - b) < 0;
> -}
> -
> -/* Is batch a after batch b ? */
> -static inline int rcu_batch_after(long a, long b)
> -{
> - return (a - b) > 0;
> -}
> -
> -/*
> - * Per-CPU data for Read-Copy UPdate.
> - * nxtlist - new callbacks are added here
> - * curlist - current batch for which quiescent cycle started if any
> - */
> -struct rcu_data {
> - /* 1) quiescent state handling : */
> - long quiescbatch; /* Batch # for grace period */
> - int passed_quiesc; /* User-mode/idle loop etc. */
> - int qs_pending; /* core waits for quiesc state */
> -
> - /* 2) batch handling */
> - long batch; /* Batch # for current RCU batch */
> - struct rcu_head *nxtlist;
> - struct rcu_head **nxttail;
> - long qlen; /* # of queued callbacks */
> - struct rcu_head *curlist;
> - struct rcu_head **curtail;
> - struct rcu_head *donelist;
> - struct rcu_head **donetail;
> - long blimit; /* Upper limit on a processed batch */
> - int cpu;
> - struct rcu_head barrier;
> -#ifdef CONFIG_SMP
> - long last_rs_qlen; /* qlen during the last resched */
> -#endif
> -};
> -
> -DECLARE_PER_CPU(struct rcu_data, rcu_data);
> -DECLARE_PER_CPU(struct rcu_data, rcu_bh_data);
> -
> -/*
> - * Increment the quiescent state counter.
> - * The counter is a bit degenerated: We do not need to know
> - * how many quiescent states passed, just if there was at least
> - * one since the start of the grace period. Thus just a flag.
> - */
> -static inline void rcu_qsctr_inc(int cpu)
> -{
> - struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> - rdp->passed_quiesc = 1;
> -}
> -static inline void rcu_bh_qsctr_inc(int cpu)
> -{
> - struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
> - rdp->passed_quiesc = 1;
> -}
> -
> -extern int rcu_pending(int cpu);
> -extern int rcu_needs_cpu(int cpu);
> -
> /**
> * rcu_read_lock - mark the beginning of an RCU read-side critical section.
> *
> @@ -163,22 +89,14 @@ extern int rcu_needs_cpu(int cpu);
> *
> * It is illegal to block while in an RCU read-side critical section.
> */
> -#define rcu_read_lock() \
> - do { \
> - preempt_disable(); \
> - __acquire(RCU); \
> - } while(0)
> +#define rcu_read_lock() __rcu_read_lock()
>
> /**
> * rcu_read_unlock - marks the end of an RCU read-side critical section.
> *
> * See rcu_read_lock() for more information.
> */
> -#define rcu_read_unlock() \
> - do { \
> - __release(RCU); \
> - preempt_enable(); \
> - } while(0)
> +#define rcu_read_unlock() __rcu_read_unlock()
>
> /*
> * So where is rcu_write_lock()? It does not exist, as there is no
> @@ -201,23 +119,15 @@ extern int rcu_needs_cpu(int cpu);
> * can use just rcu_read_lock().
> *
> */
> -#define rcu_read_lock_bh() \
> - do { \
> - local_bh_disable(); \
> - __acquire(RCU_BH); \
> - } while(0)
> -
> -/*
> +#define rcu_read_lock_bh() __rcu_read_lock_bh()
> +
> +/**
> * rcu_read_unlock_bh - marks the end of a softirq-only RCU critical section
> *
> * See rcu_read_lock_bh() for more information.
> */
> -#define rcu_read_unlock_bh() \
> - do { \
> - __release(RCU_BH); \
> - local_bh_enable(); \
> - } while(0)
> -
> +#define rcu_read_unlock_bh() __rcu_read_unlock_bh()
> +
> /**
> * rcu_dereference - fetch an RCU-protected pointer in an
> * RCU read-side critical section. This pointer may later
> @@ -268,22 +178,49 @@ extern int rcu_needs_cpu(int cpu);
> * In "classic RCU", these two guarantees happen to be one and
> * the same, but can differ in realtime RCU implementations.
> */
> -#define synchronize_sched() synchronize_rcu()
> +#define synchronize_sched() __synchronize_sched()
> +
> +/**
> + * call_rcu - Queue an RCU callback for invocation after a grace period.
> + * @head: structure to be used for queueing the RCU updates.
> + * @func: actual update function to be invoked after the grace period
> + *
> + * The update function will be invoked some time after a full grace
> + * period elapses, in other words after all currently executing RCU
> + * read-side critical sections have completed. RCU read-side critical
> + * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
> + * and may be nested.
> + */
> +extern void FASTCALL(call_rcu(struct rcu_head *head,
> + void (*func)(struct rcu_head *head)));
>
> -extern void rcu_init(void);
> -extern void rcu_check_callbacks(int cpu, int user);
> -extern void rcu_restart_cpu(int cpu);
> -extern long rcu_batches_completed(void);
> -extern long rcu_batches_completed_bh(void);
>
> -/* Exported interfaces */
> -extern void FASTCALL(call_rcu(struct rcu_head *head,
> - void (*func)(struct rcu_head *head)));
> +/**
> + * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
> + * @head: structure to be used for queueing the RCU updates.
> + * @func: actual update function to be invoked after the grace period
> + *
> + * The update function will be invoked some time after a full grace
> + * period elapses, in other words after all currently executing RCU
> + * read-side critical sections have completed. call_rcu_bh() assumes
> + * that the read-side critical sections end on completion of a softirq
> + * handler. This means that read-side critical sections in process
> + * context must not be interrupted by softirqs. This interface is to be
> + * used when most of the read-side critical sections are in softirq context.
> + * RCU read-side critical sections are delimited by rcu_read_lock() and
> + * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
> + * and rcu_read_unlock_bh(), if in process context. These may be nested.
> + */
> extern void FASTCALL(call_rcu_bh(struct rcu_head *head,
> void (*func)(struct rcu_head *head)));
> +
> +/* Exported common interfaces */
> extern void synchronize_rcu(void);
> -void synchronize_idle(void);
> extern void rcu_barrier(void);
> +
> +/* Internal to kernel */
> +extern void rcu_init(void);
> +extern void rcu_check_callbacks(int cpu, int user);
>
> #endif /* __KERNEL__ */
> #endif /* __LINUX_RCUPDATE_H */
> diff -puN kernel/Makefile~rcu-split-classic kernel/Makefile
> --- linux-2.6.18-rc3-rcu/kernel/Makefile~rcu-split-classic 2006-08-06 03:07:10.000000000 +0530
> +++ linux-2.6.18-rc3-rcu-dipankar/kernel/Makefile 2006-08-27 00:48:50.000000000 +0530
> @@ -6,7 +6,7 @@ obj-y = sched.o fork.o exec_domain.o
> exit.o itimer.o time.o softirq.o resource.o \
> sysctl.o capability.o ptrace.o timer.o user.o \
> signal.o sys.o kmod.o workqueue.o pid.o \
> - rcupdate.o extable.o params.o posix-timers.o \
> + rcupdate.o rcuclassic.o extable.o params.o posix-timers.o \
> kthread.o wait.o kfifo.o sys_ni.o posix-cpu-timers.o mutex.o \
> hrtimer.o rwsem.o
>
> diff -puN /dev/null kernel/rcuclassic.c
> --- /dev/null 2006-08-26 20:47:46.475534750 +0530
> +++ linux-2.6.18-rc3-rcu-dipankar/kernel/rcuclassic.c 2006-08-27 00:49:58.000000000 +0530
> @@ -0,0 +1,558 @@
> +/*
> + * Read-Copy Update mechanism for mutual exclusion, classic implementation
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
> + *
> + * Copyright (C) IBM Corporation, 2001
> + *
> + * Authors: Dipankar Sarma <[email protected]>
> + * Manfred Spraul <[email protected]>
> + *
> + * Based on the original work by Paul McKenney <[email protected]>
> + * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
> + *
> + * Papers: http://www.rdrop.com/users/paulmck/RCU
> + *
> + * For detailed explanation of Read-Copy Update mechanism see -
> + * Documentation/RCU/ *.txt
> + *
> + */
> +#include <linux/types.h>
> +#include <linux/kernel.h>
> +#include <linux/init.h>
> +#include <linux/spinlock.h>
> +#include <linux/smp.h>
> +#include <linux/rcupdate.h>
> +#include <linux/interrupt.h>
> +#include <linux/sched.h>
> +#include <asm/atomic.h>
> +#include <linux/bitops.h>
> +#include <linux/module.h>
> +#include <linux/completion.h>
> +#include <linux/moduleparam.h>
> +#include <linux/percpu.h>
> +#include <linux/notifier.h>
> +#include <linux/rcupdate.h>
> +#include <linux/cpu.h>
> +#include <linux/random.h>
> +#include <linux/delay.h>
> +#include <linux/byteorder/swabb.h>
> +
> +
> +/* Definition for rcupdate control block. */
> +static struct rcu_ctrlblk rcu_ctrlblk = {
> + .cur = -300,
> + .completed = -300,
> + .lock = SPIN_LOCK_UNLOCKED,
> + .cpumask = CPU_MASK_NONE,
> +};
> +static struct rcu_ctrlblk rcu_bh_ctrlblk = {
> + .cur = -300,
> + .completed = -300,
> + .lock = SPIN_LOCK_UNLOCKED,
> + .cpumask = CPU_MASK_NONE,
> +};
> +
> +DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
> +DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
> +
> +/* Fake initialization required by compiler */
> +static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
> +static int blimit = 10;
> +static int qhimark = 10000;
> +static int qlowmark = 100;
> +#ifdef CONFIG_SMP
> +static int rsinterval = 1000;
> +#endif
> +
> +#ifdef CONFIG_SMP
> +static void force_quiescent_state(struct rcu_data *rdp,
> + struct rcu_ctrlblk *rcp)
> +{
> + int cpu;
> + cpumask_t cpumask;
> + set_need_resched();
> + if (unlikely(rdp->qlen - rdp->last_rs_qlen > rsinterval)) {
> + rdp->last_rs_qlen = rdp->qlen;
> + /*
> + * Don't send IPI to itself. With irqs disabled,
> + * rdp->cpu is the current cpu.
> + */
> + cpumask = rcp->cpumask;
> + cpu_clear(rdp->cpu, cpumask);
> + for_each_cpu_mask(cpu, cpumask)
> + smp_send_reschedule(cpu);
> + }
> +}
> +#else
> +static inline void force_quiescent_state(struct rcu_data *rdp,
> + struct rcu_ctrlblk *rcp)
> +{
> + set_need_resched();
> +}
> +#endif
> +
> +/*
> + * call_rcu - Queue an RCU callback for invocation after a grace period.
> + * @head: structure to be used for queueing the RCU updates.
> + * @func: actual update function to be invoked after the grace period
> + *
> + * The update function will be invoked some time after a full grace
> + * period elapses, in other words after all currently executing RCU
> + * read-side critical sections have completed. RCU read-side critical
> + * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
> + * and may be nested.
> + */
> +void fastcall call_rcu(struct rcu_head *head,
> + void (*func)(struct rcu_head *rcu))
> +{
> + unsigned long flags;
> + struct rcu_data *rdp;
> +
> + head->func = func;
> + head->next = NULL;
> + local_irq_save(flags);
> + rdp = &__get_cpu_var(rcu_data);
> + *rdp->nxttail = head;
> + rdp->nxttail = &head->next;
> + if (unlikely(++rdp->qlen > qhimark)) {
> + rdp->blimit = INT_MAX;
> + force_quiescent_state(rdp, &rcu_ctrlblk);
> + }
> + local_irq_restore(flags);
> +}
> +
> +/*
> + * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
> + * @head: structure to be used for queueing the RCU updates.
> + * @func: actual update function to be invoked after the grace period
> + *
> + * The update function will be invoked some time after a full grace
> + * period elapses, in other words after all currently executing RCU
> + * read-side critical sections have completed. call_rcu_bh() assumes
> + * that the read-side critical sections end on completion of a softirq
> + * handler. This means that read-side critical sections in process
> + * context must not be interrupted by softirqs. This interface is to be
> + * used when most of the read-side critical sections are in softirq context.
> + * RCU read-side critical sections are delimited by rcu_read_lock() and
> + * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
> + * and rcu_read_unlock_bh(), if in process context. These may be nested.
> + */
> +void fastcall call_rcu_bh(struct rcu_head *head,
> + void (*func)(struct rcu_head *rcu))
> +{
> + unsigned long flags;
> + struct rcu_data *rdp;
> +
> + head->func = func;
> + head->next = NULL;
> + local_irq_save(flags);
> + rdp = &__get_cpu_var(rcu_bh_data);
> + *rdp->nxttail = head;
> + rdp->nxttail = &head->next;
> +
> + if (unlikely(++rdp->qlen > qhimark)) {
> + rdp->blimit = INT_MAX;
> + force_quiescent_state(rdp, &rcu_bh_ctrlblk);
> + }
> +
> + local_irq_restore(flags);
> +}
> +
> +/*
> + * Return the number of RCU batches processed thus far. Useful
> + * for debug and statistics.
> + */
> +long rcu_batches_completed(void)
> +{
> + return rcu_ctrlblk.completed;
> +}
> +
> +/*
> + * Return the number of RCU batches processed thus far. Useful
> + * for debug and statistics.
> + */
> +long rcu_batches_completed_bh(void)
> +{
> + return rcu_bh_ctrlblk.completed;
> +}
> +
> +/*
> + * Invoke the completed RCU callbacks. They are expected to be in
> + * a per-cpu list.
> + */
> +static void rcu_do_batch(struct rcu_data *rdp)
> +{
> + struct rcu_head *next, *list;
> + int count = 0;
> +
> + list = rdp->donelist;
> + while (list) {
> + next = rdp->donelist = list->next;
> + list->func(list);
> + list = next;
> + rdp->qlen--;
> + if (++count >= rdp->blimit)
> + break;
> + }
> + if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
> + rdp->blimit = blimit;
> + if (!rdp->donelist)
> + rdp->donetail = &rdp->donelist;
> + else
> + tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
> +}
> +
> +/*
> + * Grace period handling:
> + * The grace period handling consists out of two steps:
> + * - A new grace period is started.
> + * This is done by rcu_start_batch. The start is not broadcasted to
> + * all cpus, they must pick this up by comparing rcp->cur with
> + * rdp->quiescbatch. All cpus are recorded in the
> + * rcu_ctrlblk.cpumask bitmap.
> + * - All cpus must go through a quiescent state.
> + * Since the start of the grace period is not broadcasted, at least two
> + * calls to rcu_check_quiescent_state are required:
> + * The first call just notices that a new grace period is running. The
> + * following calls check if there was a quiescent state since the beginning
> + * of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
> + * the bitmap is empty, then the grace period is completed.
> + * rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
> + * period (if necessary).
> + */
> +/*
> + * Register a new batch of callbacks, and start it up if there is currently no
> + * active batch and the batch to be registered has not already occurred.
> + * Caller must hold rcu_ctrlblk.lock.
> + */
> +static void rcu_start_batch(struct rcu_ctrlblk *rcp)
> +{
> + if (rcp->next_pending &&
> + rcp->completed == rcp->cur) {
> + rcp->next_pending = 0;
> + /*
> + * next_pending == 0 must be visible in
> + * __rcu_process_callbacks() before it can see new value of cur.
> + */
> + smp_wmb();
> + rcp->cur++;
> +
> + /*
> + * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
> + * Barrier Otherwise it can cause tickless idle CPUs to be
> + * included in rcp->cpumask, which will extend graceperiods
> + * unnecessarily.
> + */
> + smp_mb();
> + cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
> +
> + }
> +}
> +
> +/*
> + * cpu went through a quiescent state since the beginning of the grace period.
> + * Clear it from the cpu mask and complete the grace period if it was the last
> + * cpu. Start another grace period if someone has further entries pending
> + */
> +static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
> +{
> + cpu_clear(cpu, rcp->cpumask);
> + if (cpus_empty(rcp->cpumask)) {
> + /* batch completed ! */
> + rcp->completed = rcp->cur;
> + rcu_start_batch(rcp);
> + }
> +}
> +
> +/*
> + * Check if the cpu has gone through a quiescent state (say context
> + * switch). If so and if it already hasn't done so in this RCU
> + * quiescent cycle, then indicate that it has done so.
> + */
> +static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
> + struct rcu_data *rdp)
> +{
> + if (rdp->quiescbatch != rcp->cur) {
> + /* start new grace period: */
> + rdp->qs_pending = 1;
> + rdp->passed_quiesc = 0;
> + rdp->quiescbatch = rcp->cur;
> + return;
> + }
> +
> + /* Grace period already completed for this cpu?
> + * qs_pending is checked instead of the actual bitmap to avoid
> + * cacheline trashing.
> + */
> + if (!rdp->qs_pending)
> + return;
> +
> + /*
> + * Was there a quiescent state since the beginning of the grace
> + * period? If no, then exit and wait for the next call.
> + */
> + if (!rdp->passed_quiesc)
> + return;
> + rdp->qs_pending = 0;
> +
> + spin_lock(&rcp->lock);
> + /*
> + * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
> + * during cpu startup. Ignore the quiescent state.
> + */
> + if (likely(rdp->quiescbatch == rcp->cur))
> + cpu_quiet(rdp->cpu, rcp);
> +
> + spin_unlock(&rcp->lock);
> +}
> +
> +
> +#ifdef CONFIG_HOTPLUG_CPU
> +
> +/* warning! helper for rcu_offline_cpu. do not use elsewhere without reviewing
> + * locking requirements, the list it's pulling from has to belong to a cpu
> + * which is dead and hence not processing interrupts.
> + */
> +static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
> + struct rcu_head **tail)
> +{
> + local_irq_disable();
> + *this_rdp->nxttail = list;
> + if (list)
> + this_rdp->nxttail = tail;
> + local_irq_enable();
> +}
> +
> +static void __rcu_offline_cpu(struct rcu_data *this_rdp,
> + struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> +{
> + /* if the cpu going offline owns the grace period
> + * we can block indefinitely waiting for it, so flush
> + * it here
> + */
> + spin_lock_bh(&rcp->lock);
> + if (rcp->cur != rcp->completed)
> + cpu_quiet(rdp->cpu, rcp);
> + spin_unlock_bh(&rcp->lock);
> + rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
> + rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
> + rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
> +}
> +
> +static void rcu_offline_cpu(int cpu)
> +{
> + struct rcu_data *this_rdp = &get_cpu_var(rcu_data);
> + struct rcu_data *this_bh_rdp = &get_cpu_var(rcu_bh_data);
> +
> + __rcu_offline_cpu(this_rdp, &rcu_ctrlblk,
> + &per_cpu(rcu_data, cpu));
> + __rcu_offline_cpu(this_bh_rdp, &rcu_bh_ctrlblk,
> + &per_cpu(rcu_bh_data, cpu));
> + put_cpu_var(rcu_data);
> + put_cpu_var(rcu_bh_data);
> + tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
> +}
> +
> +#else
> +
> +static void rcu_offline_cpu(int cpu)
> +{
> +}
> +
> +#endif
> +
> +/*
> + * This does the RCU processing work from tasklet context.
> + */
> +static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
> + struct rcu_data *rdp)
> +{
> + if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
> + *rdp->donetail = rdp->curlist;
> + rdp->donetail = rdp->curtail;
> + rdp->curlist = NULL;
> + rdp->curtail = &rdp->curlist;
> + }
> +
> + if (rdp->nxtlist && !rdp->curlist) {
> + local_irq_disable();
> + rdp->curlist = rdp->nxtlist;
> + rdp->curtail = rdp->nxttail;
> + rdp->nxtlist = NULL;
> + rdp->nxttail = &rdp->nxtlist;
> + local_irq_enable();
> +
> + /*
> + * start the next batch of callbacks
> + */
> +
> + /* determine batch number */
> + rdp->batch = rcp->cur + 1;
> + /* see the comment and corresponding wmb() in
> + * the rcu_start_batch()
> + */
> + smp_rmb();
> +
> + if (!rcp->next_pending) {
> + /* and start it/schedule start if it's a new batch */
> + spin_lock(&rcp->lock);
> + rcp->next_pending = 1;
> + rcu_start_batch(rcp);
> + spin_unlock(&rcp->lock);
> + }
> + }
> +
> + rcu_check_quiescent_state(rcp, rdp);
> + if (rdp->donelist)
> + rcu_do_batch(rdp);
> +}
> +
> +static void rcu_process_callbacks(unsigned long unused)
> +{
> + __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
> + __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
> +}
> +
> +static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> +{
> + /* This cpu has pending rcu entries and the grace period
> + * for them has completed.
> + */
> + if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
> + return 1;
> +
> + /* This cpu has no pending entries, but there are new entries */
> + if (!rdp->curlist && rdp->nxtlist)
> + return 1;
> +
> + /* This cpu has finished callbacks to invoke */
> + if (rdp->donelist)
> + return 1;
> +
> + /* The rcu core waits for a quiescent state from the cpu */
> + if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
> + return 1;
> +
> + /* nothing to do */
> + return 0;
> +}
> +
> +/*
> + * Check to see if there is any immediate RCU-related work to be done
> + * by the current CPU, returning 1 if so. This function is part of the
> + * RCU implementation; it is -not- an exported member of the RCU API.
> + */
> +int rcu_pending(int cpu)
> +{
> + return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
> + __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
> +}
> +
> +/*
> + * Check to see if any future RCU-related work will need to be done
> + * by the current CPU, even if none need be done immediately, returning
> + * 1 if so. This function is part of the RCU implementation; it is -not-
> + * an exported member of the RCU API.
> + */
> +int rcu_needs_cpu(int cpu)
> +{
> + struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> + struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu);
> +
> + return (!!rdp->curlist || !!rdp_bh->curlist || rcu_pending(cpu));
> +}
> +
> +void rcu_check_callbacks(int cpu, int user)
> +{
> + if (user ||
> + (idle_cpu(cpu) && !in_softirq() &&
> + hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
> + rcu_qsctr_inc(cpu);
> + rcu_bh_qsctr_inc(cpu);
> + } else if (!in_softirq())
> + rcu_bh_qsctr_inc(cpu);
> + tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
> +}
> +
> +static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
> + struct rcu_data *rdp)
> +{
> + memset(rdp, 0, sizeof(*rdp));
> + rdp->curtail = &rdp->curlist;
> + rdp->nxttail = &rdp->nxtlist;
> + rdp->donetail = &rdp->donelist;
> + rdp->quiescbatch = rcp->completed;
> + rdp->qs_pending = 0;
> + rdp->cpu = cpu;
> + rdp->blimit = blimit;
> +}
> +
> +static void __devinit rcu_online_cpu(int cpu)
> +{
> + struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> + struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
> +
> + rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
> + rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
> + tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
> +}
> +
> +static int __devinit rcu_cpu_notify(struct notifier_block *self,
> + unsigned long action, void *hcpu)
> +{
> + long cpu = (long)hcpu;
> + switch (action) {
> + case CPU_UP_PREPARE:
> + rcu_online_cpu(cpu);
> + break;
> + case CPU_DEAD:
> + rcu_offline_cpu(cpu);
> + break;
> + default:
> + break;
> + }
> + return NOTIFY_OK;
> +}
> +
> +static struct notifier_block __devinitdata rcu_nb = {
> + .notifier_call = rcu_cpu_notify,
> +};
> +
> +/*
> + * Initializes rcu mechanism. Assumed to be called early.
> + * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
> + * Note that rcu_qsctr and friends are implicitly
> + * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
> + */
> +void __init __rcu_init(void)
> +{
> + rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
> + (void *)(long)smp_processor_id());
> + /* Register notifier for non-boot CPUs */
> + register_cpu_notifier(&rcu_nb);
> +}
> +
> +module_param(blimit, int, 0);
> +module_param(qhimark, int, 0);
> +module_param(qlowmark, int, 0);
> +#ifdef CONFIG_SMP
> +module_param(rsinterval, int, 0);
> +#endif
> +
> +EXPORT_SYMBOL_GPL(rcu_batches_completed);
> +EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
> +EXPORT_SYMBOL_GPL(call_rcu);
> +EXPORT_SYMBOL_GPL(call_rcu_bh);
> diff -puN kernel/rcupdate.c~rcu-split-classic kernel/rcupdate.c
> --- linux-2.6.18-rc3-rcu/kernel/rcupdate.c~rcu-split-classic 2006-08-06 03:07:10.000000000 +0530
> +++ linux-2.6.18-rc3-rcu-dipankar/kernel/rcupdate.c 2006-08-06 09:58:28.000000000 +0530
> @@ -40,155 +40,53 @@
> #include <linux/sched.h>
> #include <asm/atomic.h>
> #include <linux/bitops.h>
> -#include <linux/module.h>
> #include <linux/completion.h>
> -#include <linux/moduleparam.h>
> #include <linux/percpu.h>
> -#include <linux/notifier.h>
> -#include <linux/rcupdate.h>
> #include <linux/cpu.h>
> #include <linux/mutex.h>
> +#include <linux/module.h>
>
> -/* Definition for rcupdate control block. */
> -static struct rcu_ctrlblk rcu_ctrlblk = {
> - .cur = -300,
> - .completed = -300,
> - .lock = __SPIN_LOCK_UNLOCKED(&rcu_ctrlblk.lock),
> - .cpumask = CPU_MASK_NONE,
> -};
> -static struct rcu_ctrlblk rcu_bh_ctrlblk = {
> - .cur = -300,
> - .completed = -300,
> - .lock = __SPIN_LOCK_UNLOCKED(&rcu_bh_ctrlblk.lock),
> - .cpumask = CPU_MASK_NONE,
> +struct rcu_synchronize {
> + struct rcu_head head;
> + struct completion completion;
> };
>
> -DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
> -DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
> -
> -/* Fake initialization required by compiler */
> -static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
> -static int blimit = 10;
> -static int qhimark = 10000;
> -static int qlowmark = 100;
> -#ifdef CONFIG_SMP
> -static int rsinterval = 1000;
> -#endif
> -
> +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head);
> static atomic_t rcu_barrier_cpu_count;
> static DEFINE_MUTEX(rcu_barrier_mutex);
> static struct completion rcu_barrier_completion;
>
> -#ifdef CONFIG_SMP
> -static void force_quiescent_state(struct rcu_data *rdp,
> - struct rcu_ctrlblk *rcp)
> -{
> - int cpu;
> - cpumask_t cpumask;
> - set_need_resched();
> - if (unlikely(rdp->qlen - rdp->last_rs_qlen > rsinterval)) {
> - rdp->last_rs_qlen = rdp->qlen;
> - /*
> - * Don't send IPI to itself. With irqs disabled,
> - * rdp->cpu is the current cpu.
> - */
> - cpumask = rcp->cpumask;
> - cpu_clear(rdp->cpu, cpumask);
> - for_each_cpu_mask(cpu, cpumask)
> - smp_send_reschedule(cpu);
> - }
> -}
> -#else
> -static inline void force_quiescent_state(struct rcu_data *rdp,
> - struct rcu_ctrlblk *rcp)
> +/* Because of FASTCALL declaration of complete, we use this wrapper */
> +static void wakeme_after_rcu(struct rcu_head *head)
> {
> - set_need_resched();
> + struct rcu_synchronize *rcu;
> +
> + rcu = container_of(head, struct rcu_synchronize, head);
> + complete(&rcu->completion);
> }
> -#endif
>
> /**
> - * call_rcu - Queue an RCU callback for invocation after a grace period.
> - * @head: structure to be used for queueing the RCU updates.
> - * @func: actual update function to be invoked after the grace period
> + * synchronize_rcu - wait until a grace period has elapsed.
> *
> - * The update function will be invoked some time after a full grace
> - * period elapses, in other words after all currently executing RCU
> + * Control will return to the caller some time after a full grace
> + * period has elapsed, in other words after all currently executing RCU
> * read-side critical sections have completed. RCU read-side critical
> * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
> * and may be nested.
> - */
> -void fastcall call_rcu(struct rcu_head *head,
> - void (*func)(struct rcu_head *rcu))
> -{
> - unsigned long flags;
> - struct rcu_data *rdp;
> -
> - head->func = func;
> - head->next = NULL;
> - local_irq_save(flags);
> - rdp = &__get_cpu_var(rcu_data);
> - *rdp->nxttail = head;
> - rdp->nxttail = &head->next;
> - if (unlikely(++rdp->qlen > qhimark)) {
> - rdp->blimit = INT_MAX;
> - force_quiescent_state(rdp, &rcu_ctrlblk);
> - }
> - local_irq_restore(flags);
> -}
> -
> -/**
> - * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
> - * @head: structure to be used for queueing the RCU updates.
> - * @func: actual update function to be invoked after the grace period
> *
> - * The update function will be invoked some time after a full grace
> - * period elapses, in other words after all currently executing RCU
> - * read-side critical sections have completed. call_rcu_bh() assumes
> - * that the read-side critical sections end on completion of a softirq
> - * handler. This means that read-side critical sections in process
> - * context must not be interrupted by softirqs. This interface is to be
> - * used when most of the read-side critical sections are in softirq context.
> - * RCU read-side critical sections are delimited by rcu_read_lock() and
> - * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
> - * and rcu_read_unlock_bh(), if in process context. These may be nested.
> - */
> -void fastcall call_rcu_bh(struct rcu_head *head,
> - void (*func)(struct rcu_head *rcu))
> -{
> - unsigned long flags;
> - struct rcu_data *rdp;
> -
> - head->func = func;
> - head->next = NULL;
> - local_irq_save(flags);
> - rdp = &__get_cpu_var(rcu_bh_data);
> - *rdp->nxttail = head;
> - rdp->nxttail = &head->next;
> -
> - if (unlikely(++rdp->qlen > qhimark)) {
> - rdp->blimit = INT_MAX;
> - force_quiescent_state(rdp, &rcu_bh_ctrlblk);
> - }
> -
> - local_irq_restore(flags);
> -}
> -
> -/*
> - * Return the number of RCU batches processed thus far. Useful
> - * for debug and statistics.
> - */
> -long rcu_batches_completed(void)
> -{
> - return rcu_ctrlblk.completed;
> -}
> -
> -/*
> - * Return the number of RCU batches processed thus far. Useful
> - * for debug and statistics.
> + * If your read-side code is not protected by rcu_read_lock(), do -not-
> + * use synchronize_rcu().
> */
> -long rcu_batches_completed_bh(void)
> +void synchronize_rcu(void)
> {
> - return rcu_bh_ctrlblk.completed;
> + struct rcu_synchronize rcu;
> +
> + init_completion(&rcu.completion);
> + /* Will wake me after RCU finished */
> + call_rcu(&rcu.head, wakeme_after_rcu);
> +
> + /* Wait for it */
> + wait_for_completion(&rcu.completion);
> }
>
> static void rcu_barrier_callback(struct rcu_head *notused)
> @@ -203,10 +101,8 @@ static void rcu_barrier_callback(struct
> static void rcu_barrier_func(void *notused)
> {
> int cpu = smp_processor_id();
> - struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> - struct rcu_head *head;
> + struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);
>
> - head = &rdp->barrier;
> atomic_inc(&rcu_barrier_cpu_count);
> call_rcu(head, rcu_barrier_callback);
> }
> @@ -225,410 +121,11 @@ void rcu_barrier(void)
> wait_for_completion(&rcu_barrier_completion);
> mutex_unlock(&rcu_barrier_mutex);
> }
> -EXPORT_SYMBOL_GPL(rcu_barrier);
> -
> -/*
> - * Invoke the completed RCU callbacks. They are expected to be in
> - * a per-cpu list.
> - */
> -static void rcu_do_batch(struct rcu_data *rdp)
> -{
> - struct rcu_head *next, *list;
> - int count = 0;
> -
> - list = rdp->donelist;
> - while (list) {
> - next = rdp->donelist = list->next;
> - list->func(list);
> - list = next;
> - rdp->qlen--;
> - if (++count >= rdp->blimit)
> - break;
> - }
> - if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
> - rdp->blimit = blimit;
> - if (!rdp->donelist)
> - rdp->donetail = &rdp->donelist;
> - else
> - tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
> -}
> -
> -/*
> - * Grace period handling:
> - * The grace period handling consists out of two steps:
> - * - A new grace period is started.
> - * This is done by rcu_start_batch. The start is not broadcasted to
> - * all cpus, they must pick this up by comparing rcp->cur with
> - * rdp->quiescbatch. All cpus are recorded in the
> - * rcu_ctrlblk.cpumask bitmap.
> - * - All cpus must go through a quiescent state.
> - * Since the start of the grace period is not broadcasted, at least two
> - * calls to rcu_check_quiescent_state are required:
> - * The first call just notices that a new grace period is running. The
> - * following calls check if there was a quiescent state since the beginning
> - * of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
> - * the bitmap is empty, then the grace period is completed.
> - * rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
> - * period (if necessary).
> - */
> -/*
> - * Register a new batch of callbacks, and start it up if there is currently no
> - * active batch and the batch to be registered has not already occurred.
> - * Caller must hold rcu_ctrlblk.lock.
> - */
> -static void rcu_start_batch(struct rcu_ctrlblk *rcp)
> -{
> - if (rcp->next_pending &&
> - rcp->completed == rcp->cur) {
> - rcp->next_pending = 0;
> - /*
> - * next_pending == 0 must be visible in
> - * __rcu_process_callbacks() before it can see new value of cur.
> - */
> - smp_wmb();
> - rcp->cur++;
> -
> - /*
> - * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
> - * Barrier Otherwise it can cause tickless idle CPUs to be
> - * included in rcp->cpumask, which will extend graceperiods
> - * unnecessarily.
> - */
> - smp_mb();
> - cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
> -
> - }
> -}
> -
> -/*
> - * cpu went through a quiescent state since the beginning of the grace period.
> - * Clear it from the cpu mask and complete the grace period if it was the last
> - * cpu. Start another grace period if someone has further entries pending
> - */
> -static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
> -{
> - cpu_clear(cpu, rcp->cpumask);
> - if (cpus_empty(rcp->cpumask)) {
> - /* batch completed ! */
> - rcp->completed = rcp->cur;
> - rcu_start_batch(rcp);
> - }
> -}
> -
> -/*
> - * Check if the cpu has gone through a quiescent state (say context
> - * switch). If so and if it already hasn't done so in this RCU
> - * quiescent cycle, then indicate that it has done so.
> - */
> -static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
> - struct rcu_data *rdp)
> -{
> - if (rdp->quiescbatch != rcp->cur) {
> - /* start new grace period: */
> - rdp->qs_pending = 1;
> - rdp->passed_quiesc = 0;
> - rdp->quiescbatch = rcp->cur;
> - return;
> - }
> -
> - /* Grace period already completed for this cpu?
> - * qs_pending is checked instead of the actual bitmap to avoid
> - * cacheline trashing.
> - */
> - if (!rdp->qs_pending)
> - return;
> -
> - /*
> - * Was there a quiescent state since the beginning of the grace
> - * period? If no, then exit and wait for the next call.
> - */
> - if (!rdp->passed_quiesc)
> - return;
> - rdp->qs_pending = 0;
> -
> - spin_lock(&rcp->lock);
> - /*
> - * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
> - * during cpu startup. Ignore the quiescent state.
> - */
> - if (likely(rdp->quiescbatch == rcp->cur))
> - cpu_quiet(rdp->cpu, rcp);
> -
> - spin_unlock(&rcp->lock);
> -}
> -
> -
> -#ifdef CONFIG_HOTPLUG_CPU
> -
> -/* warning! helper for rcu_offline_cpu. do not use elsewhere without reviewing
> - * locking requirements, the list it's pulling from has to belong to a cpu
> - * which is dead and hence not processing interrupts.
> - */
> -static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
> - struct rcu_head **tail)
> -{
> - local_irq_disable();
> - *this_rdp->nxttail = list;
> - if (list)
> - this_rdp->nxttail = tail;
> - local_irq_enable();
> -}
> -
> -static void __rcu_offline_cpu(struct rcu_data *this_rdp,
> - struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> -{
> - /* if the cpu going offline owns the grace period
> - * we can block indefinitely waiting for it, so flush
> - * it here
> - */
> - spin_lock_bh(&rcp->lock);
> - if (rcp->cur != rcp->completed)
> - cpu_quiet(rdp->cpu, rcp);
> - spin_unlock_bh(&rcp->lock);
> - rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
> - rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
> - rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
> -}
> -
> -static void rcu_offline_cpu(int cpu)
> -{
> - struct rcu_data *this_rdp = &get_cpu_var(rcu_data);
> - struct rcu_data *this_bh_rdp = &get_cpu_var(rcu_bh_data);
> -
> - __rcu_offline_cpu(this_rdp, &rcu_ctrlblk,
> - &per_cpu(rcu_data, cpu));
> - __rcu_offline_cpu(this_bh_rdp, &rcu_bh_ctrlblk,
> - &per_cpu(rcu_bh_data, cpu));
> - put_cpu_var(rcu_data);
> - put_cpu_var(rcu_bh_data);
> - tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
> -}
>
> -#else
> -
> -static void rcu_offline_cpu(int cpu)
> -{
> -}
> -
> -#endif
> -
> -/*
> - * This does the RCU processing work from tasklet context.
> - */
> -static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
> - struct rcu_data *rdp)
> -{
> - if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
> - *rdp->donetail = rdp->curlist;
> - rdp->donetail = rdp->curtail;
> - rdp->curlist = NULL;
> - rdp->curtail = &rdp->curlist;
> - }
> -
> - if (rdp->nxtlist && !rdp->curlist) {
> - local_irq_disable();
> - rdp->curlist = rdp->nxtlist;
> - rdp->curtail = rdp->nxttail;
> - rdp->nxtlist = NULL;
> - rdp->nxttail = &rdp->nxtlist;
> - local_irq_enable();
> -
> - /*
> - * start the next batch of callbacks
> - */
> -
> - /* determine batch number */
> - rdp->batch = rcp->cur + 1;
> - /* see the comment and corresponding wmb() in
> - * the rcu_start_batch()
> - */
> - smp_rmb();
> -
> - if (!rcp->next_pending) {
> - /* and start it/schedule start if it's a new batch */
> - spin_lock(&rcp->lock);
> - rcp->next_pending = 1;
> - rcu_start_batch(rcp);
> - spin_unlock(&rcp->lock);
> - }
> - }
> -
> - rcu_check_quiescent_state(rcp, rdp);
> - if (rdp->donelist)
> - rcu_do_batch(rdp);
> -}
> -
> -static void rcu_process_callbacks(unsigned long unused)
> -{
> - __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
> - __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
> -}
> -
> -static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> -{
> - /* This cpu has pending rcu entries and the grace period
> - * for them has completed.
> - */
> - if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
> - return 1;
> -
> - /* This cpu has no pending entries, but there are new entries */
> - if (!rdp->curlist && rdp->nxtlist)
> - return 1;
> -
> - /* This cpu has finished callbacks to invoke */
> - if (rdp->donelist)
> - return 1;
> -
> - /* The rcu core waits for a quiescent state from the cpu */
> - if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
> - return 1;
> -
> - /* nothing to do */
> - return 0;
> -}
> -
> -/*
> - * Check to see if there is any immediate RCU-related work to be done
> - * by the current CPU, returning 1 if so. This function is part of the
> - * RCU implementation; it is -not- an exported member of the RCU API.
> - */
> -int rcu_pending(int cpu)
> -{
> - return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
> - __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
> -}
> -
> -/*
> - * Check to see if any future RCU-related work will need to be done
> - * by the current CPU, even if none need be done immediately, returning
> - * 1 if so. This function is part of the RCU implementation; it is -not-
> - * an exported member of the RCU API.
> - */
> -int rcu_needs_cpu(int cpu)
> -{
> - struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> - struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu);
> -
> - return (!!rdp->curlist || !!rdp_bh->curlist || rcu_pending(cpu));
> -}
> -
> -void rcu_check_callbacks(int cpu, int user)
> -{
> - if (user ||
> - (idle_cpu(cpu) && !in_softirq() &&
> - hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
> - rcu_qsctr_inc(cpu);
> - rcu_bh_qsctr_inc(cpu);
> - } else if (!in_softirq())
> - rcu_bh_qsctr_inc(cpu);
> - tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
> -}
> -
> -static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
> - struct rcu_data *rdp)
> -{
> - memset(rdp, 0, sizeof(*rdp));
> - rdp->curtail = &rdp->curlist;
> - rdp->nxttail = &rdp->nxtlist;
> - rdp->donetail = &rdp->donelist;
> - rdp->quiescbatch = rcp->completed;
> - rdp->qs_pending = 0;
> - rdp->cpu = cpu;
> - rdp->blimit = blimit;
> -}
> -
> -static void __devinit rcu_online_cpu(int cpu)
> -{
> - struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> - struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
> -
> - rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
> - rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
> - tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
> -}
> -
> -static int __devinit rcu_cpu_notify(struct notifier_block *self,
> - unsigned long action, void *hcpu)
> -{
> - long cpu = (long)hcpu;
> - switch (action) {
> - case CPU_UP_PREPARE:
> - rcu_online_cpu(cpu);
> - break;
> - case CPU_DEAD:
> - rcu_offline_cpu(cpu);
> - break;
> - default:
> - break;
> - }
> - return NOTIFY_OK;
> -}
> -
> -static struct notifier_block __devinitdata rcu_nb = {
> - .notifier_call = rcu_cpu_notify,
> -};
> -
> -/*
> - * Initializes rcu mechanism. Assumed to be called early.
> - * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
> - * Note that rcu_qsctr and friends are implicitly
> - * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
> - */
> void __init rcu_init(void)
> {
> - rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
> - (void *)(long)smp_processor_id());
> - /* Register notifier for non-boot CPUs */
> - register_cpu_notifier(&rcu_nb);
> -}
> -
> -struct rcu_synchronize {
> - struct rcu_head head;
> - struct completion completion;
> -};
> -
> -/* Because of FASTCALL declaration of complete, we use this wrapper */
> -static void wakeme_after_rcu(struct rcu_head *head)
> -{
> - struct rcu_synchronize *rcu;
> -
> - rcu = container_of(head, struct rcu_synchronize, head);
> - complete(&rcu->completion);
> -}
> -
> -/**
> - * synchronize_rcu - wait until a grace period has elapsed.
> - *
> - * Control will return to the caller some time after a full grace
> - * period has elapsed, in other words after all currently executing RCU
> - * read-side critical sections have completed. RCU read-side critical
> - * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
> - * and may be nested.
> - *
> - * If your read-side code is not protected by rcu_read_lock(), do -not-
> - * use synchronize_rcu().
> - */
> -void synchronize_rcu(void)
> -{
> - struct rcu_synchronize rcu;
> -
> - init_completion(&rcu.completion);
> - /* Will wake me after RCU finished */
> - call_rcu(&rcu.head, wakeme_after_rcu);
> -
> - /* Wait for it */
> - wait_for_completion(&rcu.completion);
> + __rcu_init();
> }
>
> -module_param(blimit, int, 0);
> -module_param(qhimark, int, 0);
> -module_param(qlowmark, int, 0);
> -#ifdef CONFIG_SMP
> -module_param(rsinterval, int, 0);
> -#endif
> -EXPORT_SYMBOL_GPL(rcu_batches_completed);
> -EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
> -EXPORT_SYMBOL_GPL(call_rcu);
> -EXPORT_SYMBOL_GPL(call_rcu_bh);
> +EXPORT_SYMBOL_GPL(rcu_barrier);
> EXPORT_SYMBOL_GPL(synchronize_rcu);
>
> _
> -
> To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
> Please read the FAQ at http://www.tux.org/lkml/
>

2006-08-31 01:13:07

by Paul E. McKenney

[permalink] [raw]
Subject: Re: [PATCH 2/4] RCU: use a separate softirq

On Mon, Aug 28, 2006 at 09:41:12PM +0530, Dipankar Sarma wrote:
>
> Finally, RCU gets its own softirq. With it being used extensively,
> the per-cpu tasklet used earlier was just a softirq with overheads.
> This makes things more efficient.

Acked-by: Paul E. McKenney <[email protected]>

> Signed-off-by: Dipankar Sarma <[email protected]>
> ---
>
>
> include/linux/interrupt.h | 3 ++-
> kernel/rcuclassic.c | 12 +++++-------
> 2 files changed, 7 insertions(+), 8 deletions(-)
>
> diff -puN kernel/rcuclassic.c~rcu-softirq kernel/rcuclassic.c
> --- linux-2.6.18-rc3-rcu/kernel/rcuclassic.c~rcu-softirq 2006-08-27 01:01:15.000000000 +0530
> +++ linux-2.6.18-rc3-rcu-dipankar/kernel/rcuclassic.c 2006-08-27 01:01:15.000000000 +0530
> @@ -69,7 +69,6 @@ DEFINE_PER_CPU(struct rcu_data, rcu_data
> DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
>
> /* Fake initialization required by compiler */
> -static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
> static int blimit = 10;
> static int qhimark = 10000;
> static int qlowmark = 100;
> @@ -212,7 +211,7 @@ static void rcu_do_batch(struct rcu_data
> if (!rdp->donelist)
> rdp->donetail = &rdp->donelist;
> else
> - tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
> + raise_softirq(RCU_SOFTIRQ);
> }
>
> /*
> @@ -363,7 +362,6 @@ static void rcu_offline_cpu(int cpu)
> &per_cpu(rcu_bh_data, cpu));
> put_cpu_var(rcu_data);
> put_cpu_var(rcu_bh_data);
> - tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
> }
>
> #else
> @@ -375,7 +373,7 @@ static void rcu_offline_cpu(int cpu)
> #endif
>
> /*
> - * This does the RCU processing work from tasklet context.
> + * This does the RCU processing work from softirq context.
> */
> static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
> struct rcu_data *rdp)
> @@ -420,7 +418,7 @@ static void __rcu_process_callbacks(stru
> rcu_do_batch(rdp);
> }
>
> -static void rcu_process_callbacks(unsigned long unused)
> +static void rcu_process_callbacks(struct softirq_action *unused)
> {
> __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
> __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
> @@ -484,7 +482,7 @@ void rcu_check_callbacks(int cpu, int us
> rcu_bh_qsctr_inc(cpu);
> } else if (!in_softirq())
> rcu_bh_qsctr_inc(cpu);
> - tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
> + raise_softirq(RCU_SOFTIRQ);
> }
>
> static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
> @@ -507,7 +505,7 @@ static void __devinit rcu_online_cpu(int
>
> rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
> rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
> - tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
> + open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
> }
>
> static int __devinit rcu_cpu_notify(struct notifier_block *self,
> diff -puN include/linux/interrupt.h~rcu-softirq include/linux/interrupt.h
> --- linux-2.6.18-rc3-rcu/include/linux/interrupt.h~rcu-softirq 2006-08-27 01:01:15.000000000 +0530
> +++ linux-2.6.18-rc3-rcu-dipankar/include/linux/interrupt.h 2006-08-27 01:01:15.000000000 +0530
> @@ -219,7 +219,8 @@ enum
> NET_TX_SOFTIRQ,
> NET_RX_SOFTIRQ,
> BLOCK_SOFTIRQ,
> - TASKLET_SOFTIRQ
> + TASKLET_SOFTIRQ,
> + RCU_SOFTIRQ /* Preferable RCU should always be the last softirq */
> };
>
> /* softirq mask and active fields moved to irq_cpustat_t in
>
> _