2007-01-15 19:20:00

by Dipankar Sarma

[permalink] [raw]
Subject: [mm PATCH] RCU: various patches

Andrew,

Please include this patchset for some testing in -mm.

This patchset consists of various merge candidates that would
do well to have some testing in -mm. This patchset breaks
out RCU implementation from its APIs to allow multiple
implementations, gives RCU its own softirq and finally
lines up preemptible RCU from -rt tree as a configurable
RCU implementation for mainline. Published earlier and
re-diffed against -mm. One major change since the last time
is that this has a new implementation of preemptible RCU
from Paul which fixes the problem with watchdog NMI.
For details - http://lkml.org/lkml/2006/10/13/259

They have been tested lightly using combinations of
dbench, kernbench and ltp (both CONFIG_CLASSIC_RCU=y and
CONFIG_RCU_PREEMPT=y) on x86_64 and ppc64. Also ran
rcutorture successfully on my x86_64 box with both
RCU implementations.

Thanks
Dipankar


2007-01-15 19:22:16

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [mm PATCH 1/6] RCU: split classic rcu




This patch re-organizes the RCU code to enable multiple implementations
of RCU. Users of RCU continues to include rcupdate.h and the
RCU interfaces remain the same. This is in preparation for
subsequently merging the preepmtpible RCU implementation.

Signed-off-by: Dipankar Sarma <[email protected]>
---





diff -puN /dev/null include/linux/rcuclassic.h
--- /dev/null 2006-03-26 18:34:52.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/rcuclassic.h 2007-01-15 15:35:05.000000000 +0530
@@ -0,0 +1,148 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion (classic version)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright IBM Corporation, 2001
+ *
+ * Author: Dipankar Sarma <[email protected]>
+ *
+ * Based on the original work by Paul McKenney <[email protected]>
+ * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
+ * Papers:
+ * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
+ * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * http://lse.sourceforge.net/locking/rcupdate.html
+ *
+ */
+
+#ifndef __LINUX_RCUCLASSIC_H
+#define __LINUX_RCUCLASSIC_H
+
+#ifdef __KERNEL__
+
+#include <linux/cache.h>
+#include <linux/spinlock.h>
+#include <linux/threads.h>
+#include <linux/percpu.h>
+#include <linux/cpumask.h>
+#include <linux/seqlock.h>
+
+
+/* Global control variables for rcupdate callback mechanism. */
+struct rcu_ctrlblk {
+ long cur; /* Current batch number. */
+ long completed; /* Number of the last completed batch */
+ int next_pending; /* Is the next batch already waiting? */
+
+ int signaled;
+
+ spinlock_t lock ____cacheline_internodealigned_in_smp;
+ cpumask_t cpumask; /* CPUs that need to switch in order */
+ /* for current batch to proceed. */
+} ____cacheline_internodealigned_in_smp;
+
+/* Is batch a before batch b ? */
+static inline int rcu_batch_before(long a, long b)
+{
+ return (a - b) < 0;
+}
+
+/* Is batch a after batch b ? */
+static inline int rcu_batch_after(long a, long b)
+{
+ return (a - b) > 0;
+}
+
+/*
+ * Per-CPU data for Read-Copy UPdate.
+ * nxtlist - new callbacks are added here
+ * curlist - current batch for which quiescent cycle started if any
+ */
+struct rcu_data {
+ /* 1) quiescent state handling : */
+ long quiescbatch; /* Batch # for grace period */
+ int passed_quiesc; /* User-mode/idle loop etc. */
+ int qs_pending; /* core waits for quiesc state */
+
+ /* 2) batch handling */
+ long batch; /* Batch # for current RCU batch */
+ struct rcu_head *nxtlist;
+ struct rcu_head **nxttail;
+ long qlen; /* # of queued callbacks */
+ struct rcu_head *curlist;
+ struct rcu_head **curtail;
+ struct rcu_head *donelist;
+ struct rcu_head **donetail;
+ long blimit; /* Upper limit on a processed batch */
+ int cpu;
+};
+
+DECLARE_PER_CPU(struct rcu_data, rcu_data);
+DECLARE_PER_CPU(struct rcu_data, rcu_bh_data);
+
+/*
+ * Increment the quiescent state counter.
+ * The counter is a bit degenerated: We do not need to know
+ * how many quiescent states passed, just if there was at least
+ * one since the start of the grace period. Thus just a flag.
+ */
+static inline void rcu_qsctr_inc(int cpu)
+{
+ struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
+ rdp->passed_quiesc = 1;
+}
+static inline void rcu_bh_qsctr_inc(int cpu)
+{
+ struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
+ rdp->passed_quiesc = 1;
+}
+
+extern int rcu_pending(int cpu);
+extern int rcu_needs_cpu(int cpu);
+
+#define __rcu_read_lock() \
+ do { \
+ preempt_disable(); \
+ __acquire(RCU); \
+ } while(0)
+#define __rcu_read_unlock() \
+ do { \
+ __release(RCU); \
+ preempt_enable(); \
+ } while(0)
+
+#define __rcu_read_lock_bh() \
+ do { \
+ local_bh_disable(); \
+ __acquire(RCU_BH); \
+ } while(0)
+#define __rcu_read_unlock_bh() \
+ do { \
+ __release(RCU_BH); \
+ local_bh_enable(); \
+ } while(0)
+
+#define __synchronize_sched() synchronize_rcu()
+
+extern void __rcu_init(void);
+extern void rcu_check_callbacks(int cpu, int user);
+extern void rcu_restart_cpu(int cpu);
+extern long rcu_batches_completed(void);
+
+#endif /* __KERNEL__ */
+#endif /* __LINUX_RCUCLASSIC_H */
diff -puN include/linux/rcupdate.h~rcu-split-classic include/linux/rcupdate.h
--- linux-2.6.20-rc3-mm1-rcu/include/linux/rcupdate.h~rcu-split-classic 2007-01-14 23:04:09.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/rcupdate.h 2007-01-15 15:36:34.000000000 +0530
@@ -15,7 +15,7 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
- * Copyright (C) IBM Corporation, 2001
+ * Copyright IBM Corporation, 2001
*
* Author: Dipankar Sarma <[email protected]>
*
@@ -41,6 +41,7 @@
#include <linux/percpu.h>
#include <linux/cpumask.h>
#include <linux/seqlock.h>
+#include <linux/rcuclassic.h>

/**
* struct rcu_head - callback structure for use with RCU
@@ -58,81 +59,6 @@ struct rcu_head {
(ptr)->next = NULL; (ptr)->func = NULL; \
} while (0)

-
-
-/* Global control variables for rcupdate callback mechanism. */
-struct rcu_ctrlblk {
- long cur; /* Current batch number. */
- long completed; /* Number of the last completed batch */
- int next_pending; /* Is the next batch already waiting? */
-
- int signaled;
-
- spinlock_t lock ____cacheline_internodealigned_in_smp;
- cpumask_t cpumask; /* CPUs that need to switch in order */
- /* for current batch to proceed. */
-} ____cacheline_internodealigned_in_smp;
-
-/* Is batch a before batch b ? */
-static inline int rcu_batch_before(long a, long b)
-{
- return (a - b) < 0;
-}
-
-/* Is batch a after batch b ? */
-static inline int rcu_batch_after(long a, long b)
-{
- return (a - b) > 0;
-}
-
-/*
- * Per-CPU data for Read-Copy UPdate.
- * nxtlist - new callbacks are added here
- * curlist - current batch for which quiescent cycle started if any
- */
-struct rcu_data {
- /* 1) quiescent state handling : */
- long quiescbatch; /* Batch # for grace period */
- int passed_quiesc; /* User-mode/idle loop etc. */
- int qs_pending; /* core waits for quiesc state */
-
- /* 2) batch handling */
- long batch; /* Batch # for current RCU batch */
- struct rcu_head *nxtlist;
- struct rcu_head **nxttail;
- long qlen; /* # of queued callbacks */
- struct rcu_head *curlist;
- struct rcu_head **curtail;
- struct rcu_head *donelist;
- struct rcu_head **donetail;
- long blimit; /* Upper limit on a processed batch */
- int cpu;
- struct rcu_head barrier;
-};
-
-DECLARE_PER_CPU(struct rcu_data, rcu_data);
-DECLARE_PER_CPU(struct rcu_data, rcu_bh_data);
-
-/*
- * Increment the quiescent state counter.
- * The counter is a bit degenerated: We do not need to know
- * how many quiescent states passed, just if there was at least
- * one since the start of the grace period. Thus just a flag.
- */
-static inline void rcu_qsctr_inc(int cpu)
-{
- struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
- rdp->passed_quiesc = 1;
-}
-static inline void rcu_bh_qsctr_inc(int cpu)
-{
- struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
- rdp->passed_quiesc = 1;
-}
-
-extern int rcu_pending(int cpu);
-extern int rcu_needs_cpu(int cpu);
-
/**
* rcu_read_lock - mark the beginning of an RCU read-side critical section.
*
@@ -162,22 +88,14 @@ extern int rcu_needs_cpu(int cpu);
*
* It is illegal to block while in an RCU read-side critical section.
*/
-#define rcu_read_lock() \
- do { \
- preempt_disable(); \
- __acquire(RCU); \
- } while(0)
+#define rcu_read_lock() __rcu_read_lock()

/**
* rcu_read_unlock - marks the end of an RCU read-side critical section.
*
* See rcu_read_lock() for more information.
*/
-#define rcu_read_unlock() \
- do { \
- __release(RCU); \
- preempt_enable(); \
- } while(0)
+#define rcu_read_unlock() __rcu_read_unlock()

/*
* So where is rcu_write_lock()? It does not exist, as there is no
@@ -200,23 +118,15 @@ extern int rcu_needs_cpu(int cpu);
* can use just rcu_read_lock().
*
*/
-#define rcu_read_lock_bh() \
- do { \
- local_bh_disable(); \
- __acquire(RCU_BH); \
- } while(0)
-
-/*
+#define rcu_read_lock_bh() __rcu_read_lock_bh()
+
+/**
* rcu_read_unlock_bh - marks the end of a softirq-only RCU critical section
*
* See rcu_read_lock_bh() for more information.
*/
-#define rcu_read_unlock_bh() \
- do { \
- __release(RCU_BH); \
- local_bh_enable(); \
- } while(0)
-
+#define rcu_read_unlock_bh() __rcu_read_unlock_bh()
+
/**
* rcu_dereference - fetch an RCU-protected pointer in an
* RCU read-side critical section. This pointer may later
@@ -267,22 +177,49 @@ extern int rcu_needs_cpu(int cpu);
* In "classic RCU", these two guarantees happen to be one and
* the same, but can differ in realtime RCU implementations.
*/
-#define synchronize_sched() synchronize_rcu()
+#define synchronize_sched() __synchronize_sched()
+
+/**
+ * call_rcu - Queue an RCU callback for invocation after a grace period.
+ * @head: structure to be used for queueing the RCU updates.
+ * @func: actual update function to be invoked after the grace period
+ *
+ * The update function will be invoked some time after a full grace
+ * period elapses, in other words after all currently executing RCU
+ * read-side critical sections have completed. RCU read-side critical
+ * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
+ * and may be nested.
+ */
+extern void FASTCALL(call_rcu(struct rcu_head *head,
+ void (*func)(struct rcu_head *head)));

-extern void rcu_init(void);
-extern void rcu_check_callbacks(int cpu, int user);
-extern void rcu_restart_cpu(int cpu);
-extern long rcu_batches_completed(void);
-extern long rcu_batches_completed_bh(void);

-/* Exported interfaces */
-extern void FASTCALL(call_rcu(struct rcu_head *head,
- void (*func)(struct rcu_head *head)));
+/**
+ * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
+ * @head: structure to be used for queueing the RCU updates.
+ * @func: actual update function to be invoked after the grace period
+ *
+ * The update function will be invoked some time after a full grace
+ * period elapses, in other words after all currently executing RCU
+ * read-side critical sections have completed. call_rcu_bh() assumes
+ * that the read-side critical sections end on completion of a softirq
+ * handler. This means that read-side critical sections in process
+ * context must not be interrupted by softirqs. This interface is to be
+ * used when most of the read-side critical sections are in softirq context.
+ * RCU read-side critical sections are delimited by rcu_read_lock() and
+ * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
+ * and rcu_read_unlock_bh(), if in process context. These may be nested.
+ */
extern void FASTCALL(call_rcu_bh(struct rcu_head *head,
void (*func)(struct rcu_head *head)));
+
+/* Exported common interfaces */
extern void synchronize_rcu(void);
-void synchronize_idle(void);
extern void rcu_barrier(void);
+
+/* Internal to kernel */
+extern void rcu_init(void);
+extern void rcu_check_callbacks(int cpu, int user);

#endif /* __KERNEL__ */
#endif /* __LINUX_RCUPDATE_H */
diff -puN kernel/Makefile~rcu-split-classic kernel/Makefile
--- linux-2.6.20-rc3-mm1-rcu/kernel/Makefile~rcu-split-classic 2007-01-14 23:04:09.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/Makefile 2007-01-15 15:34:21.000000000 +0530
@@ -6,7 +6,7 @@ obj-y = sched.o fork.o exec_domain.o
exit.o itimer.o time.o softirq.o resource.o \
sysctl.o capability.o ptrace.o timer.o user.o user_namespace.o \
signal.o sys.o kmod.o workqueue.o pid.o \
- rcupdate.o extable.o params.o posix-timers.o \
+ rcupdate.o rcuclassic.o extable.o params.o posix-timers.o \
kthread.o wait.o kfifo.o sys_ni.o posix-cpu-timers.o mutex.o \
hrtimer.o rwsem.o latency.o nsproxy.o srcu.o

diff -puN /dev/null kernel/rcuclassic.c
--- /dev/null 2006-03-26 18:34:52.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcuclassic.c 2007-01-15 15:34:47.000000000 +0530
@@ -0,0 +1,558 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion, classic implementation
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright IBM Corporation, 2001
+ *
+ * Authors: Dipankar Sarma <[email protected]>
+ * Manfred Spraul <[email protected]>
+ *
+ * Based on the original work by Paul McKenney <[email protected]>
+ * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
+ *
+ * Papers: http://www.rdrop.com/users/paulmck/RCU
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * Documentation/RCU/ *.txt
+ *
+ */
+#include <linux/types.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/spinlock.h>
+#include <linux/smp.h>
+#include <linux/rcupdate.h>
+#include <linux/interrupt.h>
+#include <linux/sched.h>
+#include <asm/atomic.h>
+#include <linux/bitops.h>
+#include <linux/module.h>
+#include <linux/completion.h>
+#include <linux/moduleparam.h>
+#include <linux/percpu.h>
+#include <linux/notifier.h>
+#include <linux/rcupdate.h>
+#include <linux/cpu.h>
+#include <linux/random.h>
+#include <linux/delay.h>
+#include <linux/byteorder/swabb.h>
+
+
+/* Definition for rcupdate control block. */
+static struct rcu_ctrlblk rcu_ctrlblk = {
+ .cur = -300,
+ .completed = -300,
+ .lock = __SPIN_LOCK_UNLOCKED(&rcu_ctrlblk.lock),
+ .cpumask = CPU_MASK_NONE,
+};
+static struct rcu_ctrlblk rcu_bh_ctrlblk = {
+ .cur = -300,
+ .completed = -300,
+ .lock = __SPIN_LOCK_UNLOCKED(&rcu_bh_ctrlblk.lock),
+ .cpumask = CPU_MASK_NONE,
+};
+
+DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
+DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
+
+/* Fake initialization required by compiler */
+static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
+static int blimit = 10;
+static int qhimark = 10000;
+static int qlowmark = 100;
+
+#ifdef CONFIG_SMP
+static void force_quiescent_state(struct rcu_data *rdp,
+ struct rcu_ctrlblk *rcp)
+{
+ int cpu;
+ cpumask_t cpumask;
+ set_need_resched();
+ if (unlikely(!rcp->signaled)) {
+ rcp->signaled = 1;
+ /*
+ * Don't send IPI to itself. With irqs disabled,
+ * rdp->cpu is the current cpu.
+ */
+ cpumask = rcp->cpumask;
+ cpu_clear(rdp->cpu, cpumask);
+ for_each_cpu_mask(cpu, cpumask)
+ smp_send_reschedule(cpu);
+ }
+}
+#else
+static inline void force_quiescent_state(struct rcu_data *rdp,
+ struct rcu_ctrlblk *rcp)
+{
+ set_need_resched();
+}
+#endif
+
+/**
+ * call_rcu - Queue an RCU callback for invocation after a grace period.
+ * @head: structure to be used for queueing the RCU updates.
+ * @func: actual update function to be invoked after the grace period
+ *
+ * The update function will be invoked some time after a full grace
+ * period elapses, in other words after all currently executing RCU
+ * read-side critical sections have completed. RCU read-side critical
+ * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
+ * and may be nested.
+ */
+void fastcall call_rcu(struct rcu_head *head,
+ void (*func)(struct rcu_head *rcu))
+{
+ unsigned long flags;
+ struct rcu_data *rdp;
+
+ head->func = func;
+ head->next = NULL;
+ local_irq_save(flags);
+ rdp = &__get_cpu_var(rcu_data);
+ *rdp->nxttail = head;
+ rdp->nxttail = &head->next;
+ if (unlikely(++rdp->qlen > qhimark)) {
+ rdp->blimit = INT_MAX;
+ force_quiescent_state(rdp, &rcu_ctrlblk);
+ }
+ local_irq_restore(flags);
+}
+
+/**
+ * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
+ * @head: structure to be used for queueing the RCU updates.
+ * @func: actual update function to be invoked after the grace period
+ *
+ * The update function will be invoked some time after a full grace
+ * period elapses, in other words after all currently executing RCU
+ * read-side critical sections have completed. call_rcu_bh() assumes
+ * that the read-side critical sections end on completion of a softirq
+ * handler. This means that read-side critical sections in process
+ * context must not be interrupted by softirqs. This interface is to be
+ * used when most of the read-side critical sections are in softirq context.
+ * RCU read-side critical sections are delimited by rcu_read_lock() and
+ * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
+ * and rcu_read_unlock_bh(), if in process context. These may be nested.
+ */
+void fastcall call_rcu_bh(struct rcu_head *head,
+ void (*func)(struct rcu_head *rcu))
+{
+ unsigned long flags;
+ struct rcu_data *rdp;
+
+ head->func = func;
+ head->next = NULL;
+ local_irq_save(flags);
+ rdp = &__get_cpu_var(rcu_bh_data);
+ *rdp->nxttail = head;
+ rdp->nxttail = &head->next;
+
+ if (unlikely(++rdp->qlen > qhimark)) {
+ rdp->blimit = INT_MAX;
+ force_quiescent_state(rdp, &rcu_bh_ctrlblk);
+ }
+
+ local_irq_restore(flags);
+}
+
+/*
+ * Return the number of RCU batches processed thus far. Useful
+ * for debug and statistics.
+ */
+long rcu_batches_completed(void)
+{
+ return rcu_ctrlblk.completed;
+}
+
+/*
+ * Return the number of RCU batches processed thus far. Useful
+ * for debug and statistics.
+ */
+long rcu_batches_completed_bh(void)
+{
+ return rcu_bh_ctrlblk.completed;
+}
+
+/*
+ * Invoke the completed RCU callbacks. They are expected to be in
+ * a per-cpu list.
+ */
+static void rcu_do_batch(struct rcu_data *rdp)
+{
+ struct rcu_head *next, *list;
+ int count = 0;
+
+ list = rdp->donelist;
+ while (list) {
+ next = list->next;
+ prefetch(next);
+ list->func(list);
+ list = next;
+ if (++count >= rdp->blimit)
+ break;
+ }
+ rdp->donelist = list;
+
+ local_irq_disable();
+ rdp->qlen -= count;
+ local_irq_enable();
+ if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
+ rdp->blimit = blimit;
+
+ if (!rdp->donelist)
+ rdp->donetail = &rdp->donelist;
+ else
+ tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
+}
+
+/*
+ * Grace period handling:
+ * The grace period handling consists out of two steps:
+ * - A new grace period is started.
+ * This is done by rcu_start_batch. The start is not broadcasted to
+ * all cpus, they must pick this up by comparing rcp->cur with
+ * rdp->quiescbatch. All cpus are recorded in the
+ * rcu_ctrlblk.cpumask bitmap.
+ * - All cpus must go through a quiescent state.
+ * Since the start of the grace period is not broadcasted, at least two
+ * calls to rcu_check_quiescent_state are required:
+ * The first call just notices that a new grace period is running. The
+ * following calls check if there was a quiescent state since the beginning
+ * of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
+ * the bitmap is empty, then the grace period is completed.
+ * rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
+ * period (if necessary).
+ */
+/*
+ * Register a new batch of callbacks, and start it up if there is currently no
+ * active batch and the batch to be registered has not already occurred.
+ * Caller must hold rcu_ctrlblk.lock.
+ */
+static void rcu_start_batch(struct rcu_ctrlblk *rcp)
+{
+ if (rcp->next_pending &&
+ rcp->completed == rcp->cur) {
+ rcp->next_pending = 0;
+ /*
+ * next_pending == 0 must be visible in
+ * __rcu_process_callbacks() before it can see new value of cur.
+ */
+ smp_wmb();
+ rcp->cur++;
+
+ /*
+ * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
+ * Barrier Otherwise it can cause tickless idle CPUs to be
+ * included in rcp->cpumask, which will extend graceperiods
+ * unnecessarily.
+ */
+ smp_mb();
+ cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
+
+ rcp->signaled = 0;
+ }
+}
+
+/*
+ * cpu went through a quiescent state since the beginning of the grace period.
+ * Clear it from the cpu mask and complete the grace period if it was the last
+ * cpu. Start another grace period if someone has further entries pending
+ */
+static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
+{
+ cpu_clear(cpu, rcp->cpumask);
+ if (cpus_empty(rcp->cpumask)) {
+ /* batch completed ! */
+ rcp->completed = rcp->cur;
+ rcu_start_batch(rcp);
+ }
+}
+
+/*
+ * Check if the cpu has gone through a quiescent state (say context
+ * switch). If so and if it already hasn't done so in this RCU
+ * quiescent cycle, then indicate that it has done so.
+ */
+static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
+ struct rcu_data *rdp)
+{
+ if (rdp->quiescbatch != rcp->cur) {
+ /* start new grace period: */
+ rdp->qs_pending = 1;
+ rdp->passed_quiesc = 0;
+ rdp->quiescbatch = rcp->cur;
+ return;
+ }
+
+ /* Grace period already completed for this cpu?
+ * qs_pending is checked instead of the actual bitmap to avoid
+ * cacheline trashing.
+ */
+ if (!rdp->qs_pending)
+ return;
+
+ /*
+ * Was there a quiescent state since the beginning of the grace
+ * period? If no, then exit and wait for the next call.
+ */
+ if (!rdp->passed_quiesc)
+ return;
+ rdp->qs_pending = 0;
+
+ spin_lock(&rcp->lock);
+ /*
+ * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
+ * during cpu startup. Ignore the quiescent state.
+ */
+ if (likely(rdp->quiescbatch == rcp->cur))
+ cpu_quiet(rdp->cpu, rcp);
+
+ spin_unlock(&rcp->lock);
+}
+
+
+#ifdef CONFIG_HOTPLUG_CPU
+
+/* warning! helper for rcu_offline_cpu. do not use elsewhere without reviewing
+ * locking requirements, the list it's pulling from has to belong to a cpu
+ * which is dead and hence not processing interrupts.
+ */
+static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
+ struct rcu_head **tail)
+{
+ local_irq_disable();
+ *this_rdp->nxttail = list;
+ if (list)
+ this_rdp->nxttail = tail;
+ local_irq_enable();
+}
+
+static void __rcu_offline_cpu(struct rcu_data *this_rdp,
+ struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
+{
+ /* if the cpu going offline owns the grace period
+ * we can block indefinitely waiting for it, so flush
+ * it here
+ */
+ spin_lock_bh(&rcp->lock);
+ if (rcp->cur != rcp->completed)
+ cpu_quiet(rdp->cpu, rcp);
+ spin_unlock_bh(&rcp->lock);
+ rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
+ rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
+ rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
+}
+
+static void rcu_offline_cpu(int cpu)
+{
+ struct rcu_data *this_rdp = &get_cpu_var(rcu_data);
+ struct rcu_data *this_bh_rdp = &get_cpu_var(rcu_bh_data);
+
+ __rcu_offline_cpu(this_rdp, &rcu_ctrlblk,
+ &per_cpu(rcu_data, cpu));
+ __rcu_offline_cpu(this_bh_rdp, &rcu_bh_ctrlblk,
+ &per_cpu(rcu_bh_data, cpu));
+ put_cpu_var(rcu_data);
+ put_cpu_var(rcu_bh_data);
+ tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
+}
+
+#else
+
+static void rcu_offline_cpu(int cpu)
+{
+}
+
+#endif
+
+/*
+ * This does the RCU processing work from tasklet context.
+ */
+static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
+ struct rcu_data *rdp)
+{
+ if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
+ *rdp->donetail = rdp->curlist;
+ rdp->donetail = rdp->curtail;
+ rdp->curlist = NULL;
+ rdp->curtail = &rdp->curlist;
+ }
+
+ if (rdp->nxtlist && !rdp->curlist) {
+ local_irq_disable();
+ rdp->curlist = rdp->nxtlist;
+ rdp->curtail = rdp->nxttail;
+ rdp->nxtlist = NULL;
+ rdp->nxttail = &rdp->nxtlist;
+ local_irq_enable();
+
+ /*
+ * start the next batch of callbacks
+ */
+
+ /* determine batch number */
+ rdp->batch = rcp->cur + 1;
+ /* see the comment and corresponding wmb() in
+ * the rcu_start_batch()
+ */
+ smp_rmb();
+
+ if (!rcp->next_pending) {
+ /* and start it/schedule start if it's a new batch */
+ spin_lock(&rcp->lock);
+ rcp->next_pending = 1;
+ rcu_start_batch(rcp);
+ spin_unlock(&rcp->lock);
+ }
+ }
+
+ rcu_check_quiescent_state(rcp, rdp);
+ if (rdp->donelist)
+ rcu_do_batch(rdp);
+}
+
+static void rcu_process_callbacks(unsigned long unused)
+{
+ __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
+ __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
+}
+
+static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
+{
+ /* This cpu has pending rcu entries and the grace period
+ * for them has completed.
+ */
+ if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
+ return 1;
+
+ /* This cpu has no pending entries, but there are new entries */
+ if (!rdp->curlist && rdp->nxtlist)
+ return 1;
+
+ /* This cpu has finished callbacks to invoke */
+ if (rdp->donelist)
+ return 1;
+
+ /* The rcu core waits for a quiescent state from the cpu */
+ if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
+ return 1;
+
+ /* nothing to do */
+ return 0;
+}
+
+/*
+ * Check to see if there is any immediate RCU-related work to be done
+ * by the current CPU, returning 1 if so. This function is part of the
+ * RCU implementation; it is -not- an exported member of the RCU API.
+ */
+int rcu_pending(int cpu)
+{
+ return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
+ __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
+}
+
+/*
+ * Check to see if any future RCU-related work will need to be done
+ * by the current CPU, even if none need be done immediately, returning
+ * 1 if so. This function is part of the RCU implementation; it is -not-
+ * an exported member of the RCU API.
+ */
+int rcu_needs_cpu(int cpu)
+{
+ struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
+ struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu);
+
+ return (!!rdp->curlist || !!rdp_bh->curlist || rcu_pending(cpu));
+}
+
+void rcu_check_callbacks(int cpu, int user)
+{
+ if (user ||
+ (idle_cpu(cpu) && !in_softirq() &&
+ hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
+ rcu_qsctr_inc(cpu);
+ rcu_bh_qsctr_inc(cpu);
+ } else if (!in_softirq())
+ rcu_bh_qsctr_inc(cpu);
+ tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
+}
+
+static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
+ struct rcu_data *rdp)
+{
+ memset(rdp, 0, sizeof(*rdp));
+ rdp->curtail = &rdp->curlist;
+ rdp->nxttail = &rdp->nxtlist;
+ rdp->donetail = &rdp->donelist;
+ rdp->quiescbatch = rcp->completed;
+ rdp->qs_pending = 0;
+ rdp->cpu = cpu;
+ rdp->blimit = blimit;
+}
+
+static void __devinit rcu_online_cpu(int cpu)
+{
+ struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
+ struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
+
+ rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
+ rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
+ tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
+}
+
+static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
+ unsigned long action, void *hcpu)
+{
+ long cpu = (long)hcpu;
+ switch (action) {
+ case CPU_UP_PREPARE:
+ rcu_online_cpu(cpu);
+ break;
+ case CPU_DEAD:
+ rcu_offline_cpu(cpu);
+ break;
+ default:
+ break;
+ }
+ return NOTIFY_OK;
+}
+
+static struct notifier_block __cpuinitdata rcu_nb = {
+ .notifier_call = rcu_cpu_notify,
+};
+
+/*
+ * Initializes rcu mechanism. Assumed to be called early.
+ * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
+ * Note that rcu_qsctr and friends are implicitly
+ * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
+ */
+void __init __rcu_init(void)
+{
+ rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
+ (void *)(long)smp_processor_id());
+ /* Register notifier for non-boot CPUs */
+ register_cpu_notifier(&rcu_nb);
+}
+
+module_param(blimit, int, 0);
+module_param(qhimark, int, 0);
+module_param(qlowmark, int, 0);
+EXPORT_SYMBOL_GPL(rcu_batches_completed);
+EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
+EXPORT_SYMBOL_GPL(call_rcu);
+EXPORT_SYMBOL_GPL(call_rcu_bh);
diff -puN kernel/rcupdate.c~rcu-split-classic kernel/rcupdate.c
--- linux-2.6.20-rc3-mm1-rcu/kernel/rcupdate.c~rcu-split-classic 2007-01-14 23:04:09.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcupdate.c 2007-01-15 15:36:09.000000000 +0530
@@ -15,7 +15,7 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
- * Copyright (C) IBM Corporation, 2001
+ * Copyright IBM Corporation, 2001
*
* Authors: Dipankar Sarma <[email protected]>
* Manfred Spraul <[email protected]>
@@ -35,157 +35,58 @@
#include <linux/init.h>
#include <linux/spinlock.h>
#include <linux/smp.h>
-#include <linux/rcupdate.h>
#include <linux/interrupt.h>
#include <linux/sched.h>
#include <asm/atomic.h>
#include <linux/bitops.h>
-#include <linux/module.h>
#include <linux/completion.h>
-#include <linux/moduleparam.h>
#include <linux/percpu.h>
-#include <linux/notifier.h>
#include <linux/rcupdate.h>
#include <linux/cpu.h>
#include <linux/mutex.h>
+#include <linux/module.h>

-/* Definition for rcupdate control block. */
-static struct rcu_ctrlblk rcu_ctrlblk = {
- .cur = -300,
- .completed = -300,
- .lock = __SPIN_LOCK_UNLOCKED(&rcu_ctrlblk.lock),
- .cpumask = CPU_MASK_NONE,
-};
-static struct rcu_ctrlblk rcu_bh_ctrlblk = {
- .cur = -300,
- .completed = -300,
- .lock = __SPIN_LOCK_UNLOCKED(&rcu_bh_ctrlblk.lock),
- .cpumask = CPU_MASK_NONE,
+struct rcu_synchronize {
+ struct rcu_head head;
+ struct completion completion;
};

-DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
-DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
-
-/* Fake initialization required by compiler */
-static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
-static int blimit = 10;
-static int qhimark = 10000;
-static int qlowmark = 100;
-
+static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head);
static atomic_t rcu_barrier_cpu_count;
static DEFINE_MUTEX(rcu_barrier_mutex);
static struct completion rcu_barrier_completion;

-#ifdef CONFIG_SMP
-static void force_quiescent_state(struct rcu_data *rdp,
- struct rcu_ctrlblk *rcp)
-{
- int cpu;
- cpumask_t cpumask;
- set_need_resched();
- if (unlikely(!rcp->signaled)) {
- rcp->signaled = 1;
- /*
- * Don't send IPI to itself. With irqs disabled,
- * rdp->cpu is the current cpu.
- */
- cpumask = rcp->cpumask;
- cpu_clear(rdp->cpu, cpumask);
- for_each_cpu_mask(cpu, cpumask)
- smp_send_reschedule(cpu);
- }
-}
-#else
-static inline void force_quiescent_state(struct rcu_data *rdp,
- struct rcu_ctrlblk *rcp)
+/* Because of FASTCALL declaration of complete, we use this wrapper */
+static void wakeme_after_rcu(struct rcu_head *head)
{
- set_need_resched();
+ struct rcu_synchronize *rcu;
+
+ rcu = container_of(head, struct rcu_synchronize, head);
+ complete(&rcu->completion);
}
-#endif

/**
- * call_rcu - Queue an RCU callback for invocation after a grace period.
- * @head: structure to be used for queueing the RCU updates.
- * @func: actual update function to be invoked after the grace period
+ * synchronize_rcu - wait until a grace period has elapsed.
*
- * The update function will be invoked some time after a full grace
- * period elapses, in other words after all currently executing RCU
+ * Control will return to the caller some time after a full grace
+ * period has elapsed, in other words after all currently executing RCU
* read-side critical sections have completed. RCU read-side critical
* sections are delimited by rcu_read_lock() and rcu_read_unlock(),
* and may be nested.
- */
-void fastcall call_rcu(struct rcu_head *head,
- void (*func)(struct rcu_head *rcu))
-{
- unsigned long flags;
- struct rcu_data *rdp;
-
- head->func = func;
- head->next = NULL;
- local_irq_save(flags);
- rdp = &__get_cpu_var(rcu_data);
- *rdp->nxttail = head;
- rdp->nxttail = &head->next;
- if (unlikely(++rdp->qlen > qhimark)) {
- rdp->blimit = INT_MAX;
- force_quiescent_state(rdp, &rcu_ctrlblk);
- }
- local_irq_restore(flags);
-}
-
-/**
- * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
- * @head: structure to be used for queueing the RCU updates.
- * @func: actual update function to be invoked after the grace period
*
- * The update function will be invoked some time after a full grace
- * period elapses, in other words after all currently executing RCU
- * read-side critical sections have completed. call_rcu_bh() assumes
- * that the read-side critical sections end on completion of a softirq
- * handler. This means that read-side critical sections in process
- * context must not be interrupted by softirqs. This interface is to be
- * used when most of the read-side critical sections are in softirq context.
- * RCU read-side critical sections are delimited by rcu_read_lock() and
- * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
- * and rcu_read_unlock_bh(), if in process context. These may be nested.
+ * If your read-side code is not protected by rcu_read_lock(), do -not-
+ * use synchronize_rcu().
*/
-void fastcall call_rcu_bh(struct rcu_head *head,
- void (*func)(struct rcu_head *rcu))
+void synchronize_rcu(void)
{
- unsigned long flags;
- struct rcu_data *rdp;
-
- head->func = func;
- head->next = NULL;
- local_irq_save(flags);
- rdp = &__get_cpu_var(rcu_bh_data);
- *rdp->nxttail = head;
- rdp->nxttail = &head->next;
-
- if (unlikely(++rdp->qlen > qhimark)) {
- rdp->blimit = INT_MAX;
- force_quiescent_state(rdp, &rcu_bh_ctrlblk);
- }
-
- local_irq_restore(flags);
-}
+ struct rcu_synchronize rcu;

-/*
- * Return the number of RCU batches processed thus far. Useful
- * for debug and statistics.
- */
-long rcu_batches_completed(void)
-{
- return rcu_ctrlblk.completed;
-}
+ init_completion(&rcu.completion);
+ /* Will wake me after RCU finished */
+ call_rcu(&rcu.head, wakeme_after_rcu);

-/*
- * Return the number of RCU batches processed thus far. Useful
- * for debug and statistics.
- */
-long rcu_batches_completed_bh(void)
-{
- return rcu_bh_ctrlblk.completed;
+ /* Wait for it */
+ wait_for_completion(&rcu.completion);
}

static void rcu_barrier_callback(struct rcu_head *notused)
@@ -200,10 +101,8 @@ static void rcu_barrier_callback(struct
static void rcu_barrier_func(void *notused)
{
int cpu = smp_processor_id();
- struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
- struct rcu_head *head;
+ struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);

- head = &rdp->barrier;
atomic_inc(&rcu_barrier_cpu_count);
call_rcu(head, rcu_barrier_callback);
}
@@ -222,414 +121,11 @@ void rcu_barrier(void)
wait_for_completion(&rcu_barrier_completion);
mutex_unlock(&rcu_barrier_mutex);
}
-EXPORT_SYMBOL_GPL(rcu_barrier);
-
-/*
- * Invoke the completed RCU callbacks. They are expected to be in
- * a per-cpu list.
- */
-static void rcu_do_batch(struct rcu_data *rdp)
-{
- struct rcu_head *next, *list;
- int count = 0;
-
- list = rdp->donelist;
- while (list) {
- next = list->next;
- prefetch(next);
- list->func(list);
- list = next;
- if (++count >= rdp->blimit)
- break;
- }
- rdp->donelist = list;
-
- local_irq_disable();
- rdp->qlen -= count;
- local_irq_enable();
- if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
- rdp->blimit = blimit;
-
- if (!rdp->donelist)
- rdp->donetail = &rdp->donelist;
- else
- tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
-}
-
-/*
- * Grace period handling:
- * The grace period handling consists out of two steps:
- * - A new grace period is started.
- * This is done by rcu_start_batch. The start is not broadcasted to
- * all cpus, they must pick this up by comparing rcp->cur with
- * rdp->quiescbatch. All cpus are recorded in the
- * rcu_ctrlblk.cpumask bitmap.
- * - All cpus must go through a quiescent state.
- * Since the start of the grace period is not broadcasted, at least two
- * calls to rcu_check_quiescent_state are required:
- * The first call just notices that a new grace period is running. The
- * following calls check if there was a quiescent state since the beginning
- * of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
- * the bitmap is empty, then the grace period is completed.
- * rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
- * period (if necessary).
- */
-/*
- * Register a new batch of callbacks, and start it up if there is currently no
- * active batch and the batch to be registered has not already occurred.
- * Caller must hold rcu_ctrlblk.lock.
- */
-static void rcu_start_batch(struct rcu_ctrlblk *rcp)
-{
- if (rcp->next_pending &&
- rcp->completed == rcp->cur) {
- rcp->next_pending = 0;
- /*
- * next_pending == 0 must be visible in
- * __rcu_process_callbacks() before it can see new value of cur.
- */
- smp_wmb();
- rcp->cur++;
-
- /*
- * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
- * Barrier Otherwise it can cause tickless idle CPUs to be
- * included in rcp->cpumask, which will extend graceperiods
- * unnecessarily.
- */
- smp_mb();
- cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
-
- rcp->signaled = 0;
- }
-}
-
-/*
- * cpu went through a quiescent state since the beginning of the grace period.
- * Clear it from the cpu mask and complete the grace period if it was the last
- * cpu. Start another grace period if someone has further entries pending
- */
-static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
-{
- cpu_clear(cpu, rcp->cpumask);
- if (cpus_empty(rcp->cpumask)) {
- /* batch completed ! */
- rcp->completed = rcp->cur;
- rcu_start_batch(rcp);
- }
-}
-
-/*
- * Check if the cpu has gone through a quiescent state (say context
- * switch). If so and if it already hasn't done so in this RCU
- * quiescent cycle, then indicate that it has done so.
- */
-static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
- struct rcu_data *rdp)
-{
- if (rdp->quiescbatch != rcp->cur) {
- /* start new grace period: */
- rdp->qs_pending = 1;
- rdp->passed_quiesc = 0;
- rdp->quiescbatch = rcp->cur;
- return;
- }
-
- /* Grace period already completed for this cpu?
- * qs_pending is checked instead of the actual bitmap to avoid
- * cacheline trashing.
- */
- if (!rdp->qs_pending)
- return;
-
- /*
- * Was there a quiescent state since the beginning of the grace
- * period? If no, then exit and wait for the next call.
- */
- if (!rdp->passed_quiesc)
- return;
- rdp->qs_pending = 0;
-
- spin_lock(&rcp->lock);
- /*
- * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
- * during cpu startup. Ignore the quiescent state.
- */
- if (likely(rdp->quiescbatch == rcp->cur))
- cpu_quiet(rdp->cpu, rcp);
-
- spin_unlock(&rcp->lock);
-}
-
-
-#ifdef CONFIG_HOTPLUG_CPU
-
-/* warning! helper for rcu_offline_cpu. do not use elsewhere without reviewing
- * locking requirements, the list it's pulling from has to belong to a cpu
- * which is dead and hence not processing interrupts.
- */
-static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
- struct rcu_head **tail)
-{
- local_irq_disable();
- *this_rdp->nxttail = list;
- if (list)
- this_rdp->nxttail = tail;
- local_irq_enable();
-}
-
-static void __rcu_offline_cpu(struct rcu_data *this_rdp,
- struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
-{
- /* if the cpu going offline owns the grace period
- * we can block indefinitely waiting for it, so flush
- * it here
- */
- spin_lock_bh(&rcp->lock);
- if (rcp->cur != rcp->completed)
- cpu_quiet(rdp->cpu, rcp);
- spin_unlock_bh(&rcp->lock);
- rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
- rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
- rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
-}
-
-static void rcu_offline_cpu(int cpu)
-{
- struct rcu_data *this_rdp = &get_cpu_var(rcu_data);
- struct rcu_data *this_bh_rdp = &get_cpu_var(rcu_bh_data);
-
- __rcu_offline_cpu(this_rdp, &rcu_ctrlblk,
- &per_cpu(rcu_data, cpu));
- __rcu_offline_cpu(this_bh_rdp, &rcu_bh_ctrlblk,
- &per_cpu(rcu_bh_data, cpu));
- put_cpu_var(rcu_data);
- put_cpu_var(rcu_bh_data);
- tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
-}
-
-#else

-static void rcu_offline_cpu(int cpu)
-{
-}
-
-#endif
-
-/*
- * This does the RCU processing work from tasklet context.
- */
-static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
- struct rcu_data *rdp)
-{
- if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
- *rdp->donetail = rdp->curlist;
- rdp->donetail = rdp->curtail;
- rdp->curlist = NULL;
- rdp->curtail = &rdp->curlist;
- }
-
- if (rdp->nxtlist && !rdp->curlist) {
- local_irq_disable();
- rdp->curlist = rdp->nxtlist;
- rdp->curtail = rdp->nxttail;
- rdp->nxtlist = NULL;
- rdp->nxttail = &rdp->nxtlist;
- local_irq_enable();
-
- /*
- * start the next batch of callbacks
- */
-
- /* determine batch number */
- rdp->batch = rcp->cur + 1;
- /* see the comment and corresponding wmb() in
- * the rcu_start_batch()
- */
- smp_rmb();
-
- if (!rcp->next_pending) {
- /* and start it/schedule start if it's a new batch */
- spin_lock(&rcp->lock);
- rcp->next_pending = 1;
- rcu_start_batch(rcp);
- spin_unlock(&rcp->lock);
- }
- }
-
- rcu_check_quiescent_state(rcp, rdp);
- if (rdp->donelist)
- rcu_do_batch(rdp);
-}
-
-static void rcu_process_callbacks(unsigned long unused)
-{
- __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
- __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
-}
-
-static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
-{
- /* This cpu has pending rcu entries and the grace period
- * for them has completed.
- */
- if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
- return 1;
-
- /* This cpu has no pending entries, but there are new entries */
- if (!rdp->curlist && rdp->nxtlist)
- return 1;
-
- /* This cpu has finished callbacks to invoke */
- if (rdp->donelist)
- return 1;
-
- /* The rcu core waits for a quiescent state from the cpu */
- if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
- return 1;
-
- /* nothing to do */
- return 0;
-}
-
-/*
- * Check to see if there is any immediate RCU-related work to be done
- * by the current CPU, returning 1 if so. This function is part of the
- * RCU implementation; it is -not- an exported member of the RCU API.
- */
-int rcu_pending(int cpu)
-{
- return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
- __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
-}
-
-/*
- * Check to see if any future RCU-related work will need to be done
- * by the current CPU, even if none need be done immediately, returning
- * 1 if so. This function is part of the RCU implementation; it is -not-
- * an exported member of the RCU API.
- */
-int rcu_needs_cpu(int cpu)
-{
- struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
- struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu);
-
- return (!!rdp->curlist || !!rdp_bh->curlist || rcu_pending(cpu));
-}
-
-void rcu_check_callbacks(int cpu, int user)
-{
- if (user ||
- (idle_cpu(cpu) && !in_softirq() &&
- hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
- rcu_qsctr_inc(cpu);
- rcu_bh_qsctr_inc(cpu);
- } else if (!in_softirq())
- rcu_bh_qsctr_inc(cpu);
- tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
-}
-
-static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
- struct rcu_data *rdp)
-{
- memset(rdp, 0, sizeof(*rdp));
- rdp->curtail = &rdp->curlist;
- rdp->nxttail = &rdp->nxtlist;
- rdp->donetail = &rdp->donelist;
- rdp->quiescbatch = rcp->completed;
- rdp->qs_pending = 0;
- rdp->cpu = cpu;
- rdp->blimit = blimit;
-}
-
-static void __devinit rcu_online_cpu(int cpu)
-{
- struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
- struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
-
- rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
- rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
- tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
-}
-
-static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
- unsigned long action, void *hcpu)
-{
- long cpu = (long)hcpu;
- switch (action) {
- case CPU_UP_PREPARE:
- rcu_online_cpu(cpu);
- break;
- case CPU_DEAD:
- rcu_offline_cpu(cpu);
- break;
- default:
- break;
- }
- return NOTIFY_OK;
-}
-
-static struct notifier_block __cpuinitdata rcu_nb = {
- .notifier_call = rcu_cpu_notify,
-};
-
-/*
- * Initializes rcu mechanism. Assumed to be called early.
- * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
- * Note that rcu_qsctr and friends are implicitly
- * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
- */
void __init rcu_init(void)
{
- rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
- (void *)(long)smp_processor_id());
- /* Register notifier for non-boot CPUs */
- register_cpu_notifier(&rcu_nb);
-}
-
-struct rcu_synchronize {
- struct rcu_head head;
- struct completion completion;
-};
-
-/* Because of FASTCALL declaration of complete, we use this wrapper */
-static void wakeme_after_rcu(struct rcu_head *head)
-{
- struct rcu_synchronize *rcu;
-
- rcu = container_of(head, struct rcu_synchronize, head);
- complete(&rcu->completion);
-}
-
-/**
- * synchronize_rcu - wait until a grace period has elapsed.
- *
- * Control will return to the caller some time after a full grace
- * period has elapsed, in other words after all currently executing RCU
- * read-side critical sections have completed. RCU read-side critical
- * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
- * and may be nested.
- *
- * If your read-side code is not protected by rcu_read_lock(), do -not-
- * use synchronize_rcu().
- */
-void synchronize_rcu(void)
-{
- struct rcu_synchronize rcu;
-
- init_completion(&rcu.completion);
- /* Will wake me after RCU finished */
- call_rcu(&rcu.head, wakeme_after_rcu);
-
- /* Wait for it */
- wait_for_completion(&rcu.completion);
+ __rcu_init();
}
-
-module_param(blimit, int, 0);
-module_param(qhimark, int, 0);
-module_param(qlowmark, int, 0);
-EXPORT_SYMBOL_GPL(rcu_batches_completed);
-EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
-EXPORT_SYMBOL_GPL(call_rcu);
-EXPORT_SYMBOL_GPL(call_rcu_bh);
+
+EXPORT_SYMBOL_GPL(rcu_barrier);
EXPORT_SYMBOL_GPL(synchronize_rcu);

_

2007-01-15 19:23:28

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [mm PATCH 2/6] RCU: softirq for RCU



Finally, RCU gets its own softirq. With it being used extensively,
the per-cpu tasklet used earlier was just a softirq with overheads.
This makes things more efficient.

Signed-off-by: Dipankar Sarma <[email protected]>
---



diff -puN include/linux/interrupt.h~rcu-softirq include/linux/interrupt.h
--- linux-2.6.20-rc3-mm1-rcu/include/linux/interrupt.h~rcu-softirq 2007-01-15 15:36:43.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/interrupt.h 2007-01-15 15:36:43.000000000 +0530
@@ -236,6 +236,7 @@ enum
#ifdef CONFIG_HIGH_RES_TIMERS
HRTIMER_SOFTIRQ,
#endif
+ RCU_SOFTIRQ /* Preferable RCU should always be the last softirq */
};

/* softirq mask and active fields moved to irq_cpustat_t in
diff -puN kernel/rcuclassic.c~rcu-softirq kernel/rcuclassic.c
--- linux-2.6.20-rc3-mm1-rcu/kernel/rcuclassic.c~rcu-softirq 2007-01-15 15:36:43.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcuclassic.c 2007-01-15 15:36:43.000000000 +0530
@@ -69,7 +69,6 @@ DEFINE_PER_CPU(struct rcu_data, rcu_data
DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };

/* Fake initialization required by compiler */
-static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
static int blimit = 10;
static int qhimark = 10000;
static int qlowmark = 100;
@@ -215,7 +214,7 @@ static void rcu_do_batch(struct rcu_data
if (!rdp->donelist)
rdp->donetail = &rdp->donelist;
else
- tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
+ raise_softirq(RCU_SOFTIRQ);
}

/*
@@ -367,7 +366,6 @@ static void rcu_offline_cpu(int cpu)
&per_cpu(rcu_bh_data, cpu));
put_cpu_var(rcu_data);
put_cpu_var(rcu_bh_data);
- tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
}

#else
@@ -379,7 +377,7 @@ static void rcu_offline_cpu(int cpu)
#endif

/*
- * This does the RCU processing work from tasklet context.
+ * This does the RCU processing work from softirq context.
*/
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
@@ -424,7 +422,7 @@ static void __rcu_process_callbacks(stru
rcu_do_batch(rdp);
}

-static void rcu_process_callbacks(unsigned long unused)
+static void rcu_process_callbacks(struct softirq_action *unused)
{
__rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
__rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
@@ -488,7 +486,7 @@ void rcu_check_callbacks(int cpu, int us
rcu_bh_qsctr_inc(cpu);
} else if (!in_softirq())
rcu_bh_qsctr_inc(cpu);
- tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
+ raise_softirq(RCU_SOFTIRQ);
}

static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
@@ -511,7 +509,7 @@ static void __devinit rcu_online_cpu(int

rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
- tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
+ open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
}

static int __cpuinit rcu_cpu_notify(struct notifier_block *self,

_

2007-01-15 19:25:27

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [mm PATCH 3/6] RCU: Fix barriers



Fix rcu_barrier() to work properly in preemptive kernel environment.
Also, the ordering of callback must be preserved while moving
callbacks to another CPU during CPU hotplug.

Signed-off-by: Dipankar Sarma <[email protected]>
---



diff -puN kernel/rcuclassic.c~rcu-fix-barriers kernel/rcuclassic.c
--- linux-2.6.20-rc3-mm1-rcu/kernel/rcuclassic.c~rcu-fix-barriers 2007-01-15 15:36:47.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcuclassic.c 2007-01-15 15:36:47.000000000 +0530
@@ -350,9 +350,9 @@ static void __rcu_offline_cpu(struct rcu
if (rcp->cur != rcp->completed)
cpu_quiet(rdp->cpu, rcp);
spin_unlock_bh(&rcp->lock);
+ rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
- rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
}

static void rcu_offline_cpu(int cpu)
diff -puN kernel/rcupdate.c~rcu-fix-barriers kernel/rcupdate.c
--- linux-2.6.20-rc3-mm1-rcu/kernel/rcupdate.c~rcu-fix-barriers 2007-01-15 15:36:47.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcupdate.c 2007-01-15 15:36:47.000000000 +0530
@@ -117,7 +117,18 @@ void rcu_barrier(void)
mutex_lock(&rcu_barrier_mutex);
init_completion(&rcu_barrier_completion);
atomic_set(&rcu_barrier_cpu_count, 0);
+ /*
+ * The queueing of callbacks in all CPUs must be
+ * atomic with respect to RCU, otherwise one cpu may
+ * queue a callback, wait for a grace period, decrement
+ * barrier count and call complete(), while other CPUs
+ * haven't yet queued anything. So, we need to make sure
+ * that no grace period happens until all the callbacks
+ * are queued.
+ */
+ rcu_read_lock();
on_each_cpu(rcu_barrier_func, NULL, 0, 1);
+ rcu_read_unlock();
wait_for_completion(&rcu_barrier_completion);
mutex_unlock(&rcu_barrier_mutex);
}

_

2007-01-15 19:29:46

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [mm PATCH 4/6] RCU: preemptible RCU

From: Paul McKenney <[email protected]>

This patch implements a new version of RCU which allows its read-side
critical sections to be preempted. It uses a set of counter pairs
to keep track of the read-side critical sections and flips them
when all tasks exit read-side critical section. The details
of this implementation can be found in this paper -

http://www.rdrop.com/users/paulmck/RCU/OLSrtRCU.2006.08.11a.pdf

This patch was developed as a part of the -rt kernel
development and meant to provide better latencies when
read-side critical sections of RCU don't disable preemption.
As a consequence of keeping track of RCU readers, the readers
have a slight overhead (optimizations in the paper).
This implementation co-exists with the "classic" RCU
implementations and can be switched to at compiler.

Signed-off-by: Paul McKenney <[email protected]>
Signed-off-by: Dipankar Sarma <[email protected]>



diff -puN include/linux/init_task.h~rcu-preempt include/linux/init_task.h
--- linux-2.6.20-rc3-mm1-rcu/include/linux/init_task.h~rcu-preempt 2007-01-15 15:36:51.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/init_task.h 2007-01-15 15:36:51.000000000 +0530
@@ -90,6 +90,14 @@ extern struct nsproxy init_nsproxy;

extern struct group_info init_groups;

+#ifdef CONFIG_PREEMPT_RCU
+#define INIT_PREEMPT_RCU \
+ .rcu_read_lock_nesting = 0, \
+ .rcu_flipctr_idx = 0,
+#else
+#define INIT_PREEMPT_RCU
+#endif
+
/*
* INIT_TASK is used to set up the first task table, touch at
* your own risk!. Base=0, limit=0x1fffff (=2MB)
@@ -111,6 +119,7 @@ extern struct group_info init_groups;
.run_list = LIST_HEAD_INIT(tsk.run_list), \
.ioprio = 0, \
.time_slice = HZ, \
+ INIT_PREEMPT_RCU \
.tasks = LIST_HEAD_INIT(tsk.tasks), \
.ptrace_children= LIST_HEAD_INIT(tsk.ptrace_children), \
.ptrace_list = LIST_HEAD_INIT(tsk.ptrace_list), \
diff -puN include/linux/rcuclassic.h~rcu-preempt include/linux/rcuclassic.h
--- linux-2.6.20-rc3-mm1-rcu/include/linux/rcuclassic.h~rcu-preempt 2007-01-15 15:36:51.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/rcuclassic.h 2007-01-15 15:36:51.000000000 +0530
@@ -142,7 +142,6 @@ extern int rcu_needs_cpu(int cpu);
extern void __rcu_init(void);
extern void rcu_check_callbacks(int cpu, int user);
extern void rcu_restart_cpu(int cpu);
-extern long rcu_batches_completed(void);

#endif /* __KERNEL__ */
#endif /* __LINUX_RCUCLASSIC_H */
diff -puN include/linux/rcupdate.h~rcu-preempt include/linux/rcupdate.h
--- linux-2.6.20-rc3-mm1-rcu/include/linux/rcupdate.h~rcu-preempt 2007-01-15 15:36:51.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/rcupdate.h 2007-01-15 15:36:51.000000000 +0530
@@ -41,7 +41,11 @@
#include <linux/percpu.h>
#include <linux/cpumask.h>
#include <linux/seqlock.h>
+#ifdef CONFIG_CLASSIC_RCU
#include <linux/rcuclassic.h>
+#else
+#include <linux/rcupreempt.h>
+#endif

/**
* struct rcu_head - callback structure for use with RCU
@@ -216,10 +220,13 @@ extern void FASTCALL(call_rcu_bh(struct
/* Exported common interfaces */
extern void synchronize_rcu(void);
extern void rcu_barrier(void);
+extern long rcu_batches_completed(void);
+extern long rcu_batches_completed_bh(void);

/* Internal to kernel */
extern void rcu_init(void);
extern void rcu_check_callbacks(int cpu, int user);
+extern int rcu_needs_cpu(int cpu);

#endif /* __KERNEL__ */
#endif /* __LINUX_RCUPDATE_H */
diff -puN /dev/null include/linux/rcupreempt.h
--- /dev/null 2006-03-26 18:34:52.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/rcupreempt.h 2007-01-15 15:36:51.000000000 +0530
@@ -0,0 +1,65 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion (RT implementation)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright IBM Corporation, 2006
+ *
+ * Author: Paul McKenney <[email protected]>
+ *
+ * Based on the original work by Paul McKenney <[email protected]>
+ * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
+ * Papers:
+ * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
+ * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * http://lse.sourceforge.net/locking/rcupdate.html
+ *
+ */
+
+#ifndef __LINUX_RCUPREEMPT_H
+#define __LINUX_RCUPREEMPT_H
+
+#ifdef __KERNEL__
+
+#include <linux/cache.h>
+#include <linux/spinlock.h>
+#include <linux/threads.h>
+#include <linux/percpu.h>
+#include <linux/cpumask.h>
+#include <linux/seqlock.h>
+
+#define rcu_qsctr_inc(cpu)
+#define rcu_bh_qsctr_inc(cpu)
+#define call_rcu_bh(head, rcu) call_rcu(head, rcu)
+
+extern void __rcu_read_lock(void);
+extern void __rcu_read_unlock(void);
+extern int rcu_pending(int cpu);
+
+#define __rcu_read_lock_bh() { rcu_read_lock(); local_bh_disable(); }
+#define __rcu_read_unlock_bh() { local_bh_enable(); rcu_read_unlock(); }
+
+#define __rcu_read_lock_nesting() (current->rcu_read_lock_nesting)
+
+extern void __synchronize_sched(void);
+
+extern void __rcu_init(void);
+extern void rcu_check_callbacks(int cpu, int user);
+extern void rcu_restart_cpu(int cpu);
+
+#endif /* __KERNEL__ */
+#endif /* __LINUX_RCUPREEMPT_H */
diff -puN include/linux/sched.h~rcu-preempt include/linux/sched.h
--- linux-2.6.20-rc3-mm1-rcu/include/linux/sched.h~rcu-preempt 2007-01-15 15:36:51.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/sched.h 2007-01-15 15:36:51.000000000 +0530
@@ -848,6 +848,11 @@ struct task_struct {
cpumask_t cpus_allowed;
unsigned int time_slice, first_time_slice;

+#ifdef CONFIG_PREEMPT_RCU
+ int rcu_read_lock_nesting;
+ int rcu_flipctr_idx;
+#endif
+
#if defined(CONFIG_SCHEDSTATS) || defined(CONFIG_TASK_DELAY_ACCT)
struct sched_info sched_info;
#endif
diff -puN kernel/fork.c~rcu-preempt kernel/fork.c
--- linux-2.6.20-rc3-mm1-rcu/kernel/fork.c~rcu-preempt 2007-01-15 15:36:51.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/fork.c 2007-01-15 15:36:51.000000000 +0530
@@ -941,6 +941,16 @@ static inline void rt_mutex_init_task(st
#endif
}

+#ifdef CONFIG_PREEMPT_RCU
+static inline void rcu_task_init(struct task_struct *p)
+{
+ p->rcu_read_lock_nesting = 0;
+ p->rcu_flipctr_idx = 0;
+}
+#else
+static inline void rcu_task_init(struct task_struct *p) {}
+#endif
+
/*
* This creates a new process as a copy of the old one,
* but does not actually start it yet.
@@ -1026,6 +1036,7 @@ static struct task_struct *copy_process(

INIT_LIST_HEAD(&p->children);
INIT_LIST_HEAD(&p->sibling);
+ rcu_task_init(p);
p->vfork_done = NULL;
spin_lock_init(&p->alloc_lock);

diff -puN kernel/Kconfig.preempt~rcu-preempt kernel/Kconfig.preempt
--- linux-2.6.20-rc3-mm1-rcu/kernel/Kconfig.preempt~rcu-preempt 2007-01-15 15:36:51.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/Kconfig.preempt 2007-01-15 15:36:51.000000000 +0530
@@ -63,3 +63,29 @@ config PREEMPT_BKL
Say Y here if you are building a kernel for a desktop system.
Say N if you are unsure.

+choice
+ prompt "RCU implementation type:"
+ default CLASSIC_RCU
+
+config CLASSIC_RCU
+ bool "Classic RCU"
+ help
+ This option selects the classic RCU implementation that is
+ designed for best read-side performance on non-realtime
+ systems.
+
+ Say Y if you are unsure.
+
+config PREEMPT_RCU
+ bool "Preemptible RCU"
+ help
+ This option reduces the latency of the kernel by making certain
+ RCU sections preemptible. Normally RCU code is non-preemptible, if
+ this option is selected then read-only RCU sections become
+ preemptible. This helps latency, but may expose bugs due to
+ now-naive assumptions about each RCU read-side critical section
+ remaining on a given CPU through its execution.
+
+ Say N if you are unsure.
+
+endchoice
diff -puN kernel/Makefile~rcu-preempt kernel/Makefile
--- linux-2.6.20-rc3-mm1-rcu/kernel/Makefile~rcu-preempt 2007-01-15 15:36:51.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/Makefile 2007-01-15 15:36:51.000000000 +0530
@@ -6,9 +6,9 @@ obj-y = sched.o fork.o exec_domain.o
exit.o itimer.o time.o softirq.o resource.o \
sysctl.o capability.o ptrace.o timer.o user.o user_namespace.o \
signal.o sys.o kmod.o workqueue.o pid.o \
- rcupdate.o rcuclassic.o extable.o params.o posix-timers.o \
+ extable.o params.o posix-timers.o \
kthread.o wait.o kfifo.o sys_ni.o posix-cpu-timers.o mutex.o \
- hrtimer.o rwsem.o latency.o nsproxy.o srcu.o
+ hrtimer.o rwsem.o latency.o nsproxy.o rcupdate.o srcu.o

obj-$(CONFIG_STACKTRACE) += stacktrace.o
obj-y += time/
@@ -46,6 +46,8 @@ obj-$(CONFIG_DETECT_SOFTLOCKUP) += softl
obj-$(CONFIG_GENERIC_HARDIRQS) += irq/
obj-$(CONFIG_SECCOMP) += seccomp.o
obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o
+obj-$(CONFIG_CLASSIC_RCU) += rcuclassic.o
+obj-$(CONFIG_PREEMPT_RCU) += rcupreempt.o
obj-$(CONFIG_DEBUG_SYNCHRO_TEST) += synchro-test.o
obj-$(CONFIG_RELAY) += relay.o
obj-$(CONFIG_UTS_NS) += utsname.o
diff -puN /dev/null kernel/rcupreempt.c
--- /dev/null 2006-03-26 18:34:52.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcupreempt.c 2007-01-15 15:36:51.000000000 +0530
@@ -0,0 +1,595 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion, realtime implementation
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright IBM Corporation, 2006
+ *
+ * Authors: Paul E. McKenney <[email protected]>
+ * With thanks to Esben Nielsen, Bill Huey, and Ingo Molnar
+ * for pushing me away from locks and towards counters, and
+ * to Suparna Bhattacharya for pushing me completely away
+ * from atomic instructions on the read side.
+ *
+ * Papers: http://www.rdrop.com/users/paulmck/RCU
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * Documentation/RCU/ *.txt
+ *
+ */
+#include <linux/types.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/spinlock.h>
+#include <linux/smp.h>
+#include <linux/rcupdate.h>
+#include <linux/interrupt.h>
+#include <linux/sched.h>
+#include <asm/atomic.h>
+#include <linux/bitops.h>
+#include <linux/module.h>
+#include <linux/completion.h>
+#include <linux/moduleparam.h>
+#include <linux/percpu.h>
+#include <linux/notifier.h>
+#include <linux/rcupdate.h>
+#include <linux/cpu.h>
+#include <linux/random.h>
+#include <linux/delay.h>
+#include <linux/byteorder/swabb.h>
+#include <linux/cpumask.h>
+
+/*
+ * PREEMPT_RCU data structures.
+ */
+
+struct rcu_data {
+ spinlock_t lock;
+ long completed; /* Number of last completed batch. */
+ struct rcu_head *nextlist;
+ struct rcu_head **nexttail;
+ struct rcu_head *waitlist;
+ struct rcu_head **waittail;
+ struct rcu_head *donelist;
+ struct rcu_head **donetail;
+};
+struct rcu_ctrlblk {
+ spinlock_t fliplock;
+ long completed; /* Number of last completed batch. */
+};
+static struct rcu_data rcu_data;
+static struct rcu_ctrlblk rcu_ctrlblk = {
+ .fliplock = SPIN_LOCK_UNLOCKED,
+ .completed = 0,
+};
+static DEFINE_PER_CPU(int [2], rcu_flipctr) = { 0, 0 };
+
+/*
+ * States for rcu_try_flip() and friends.
+ */
+enum rcu_try_flip_state {
+ RCU_TRY_FLIP_IDLE, /* "I" */
+ RCU_TRY_FLIP_GP, /* "G" */
+ RCU_TRY_FLIP_WAITACK, /* "A" */
+ RCU_TRY_FLIP_WAITZERO, /* "Z" */
+ RCU_TRY_FLIP_WAITMB /* "M" */
+};
+static enum rcu_try_flip_state rcu_try_flip_state = RCU_TRY_FLIP_IDLE;
+
+/*
+ * Enum and per-CPU flag to determine when each CPU has seen
+ * the most recent counter flip.
+ */
+enum rcu_flip_flag_value {
+ RCU_FLIP_SEEN, /* Steady/initial state, last flip seen. */
+ /* Only GP detector can update. */
+ RCU_FLIPPED /* Flip just completed, need confirmation. */
+ /* Only corresponding CPU can update. */
+};
+static DEFINE_PER_CPU(enum rcu_flip_flag_value, rcu_flip_flag) = RCU_FLIP_SEEN;
+
+/*
+ * Enum and per-CPU flag to determine when each CPU has executed the
+ * needed memory barrier to fence in memory references from its last RCU
+ * read-side critical section in the just-completed grace period.
+ */
+enum rcu_mb_flag_value {
+ RCU_MB_DONE, /* Steady/initial state, no mb()s required. */
+ /* Only GP detector can update. */
+ RCU_MB_NEEDED /* Flip just completed, need an mb(). */
+ /* Only corresponding CPU can update. */
+};
+static DEFINE_PER_CPU(enum rcu_mb_flag_value, rcu_mb_flag) = RCU_MB_DONE;
+
+/*
+ * Return the number of RCU batches processed thus far. Useful
+ * for debug and statistics.
+ */
+long rcu_batches_completed(void)
+{
+ return rcu_ctrlblk.completed;
+}
+
+long rcu_batches_completed_bh(void)
+{
+ return rcu_ctrlblk.completed;
+}
+
+void __rcu_read_lock(void)
+{
+ int idx;
+ int nesting;
+
+ nesting = current->rcu_read_lock_nesting;
+ if (nesting != 0) {
+
+ /* An earlier rcu_read_lock() covers us, just count this one. */
+ current->rcu_read_lock_nesting = nesting + 1;
+
+ } else {
+ unsigned long oldirq;
+
+ /*
+ * Disable local interrupts to prevent the grace-period
+ * detection state machine from seeing us half-done.
+ */
+ local_irq_save(oldirq);
+
+ /*
+ * Outermost nesting of rcu_read_lock(), so atomically
+ * increment the current counter for the current CPU.
+ */
+ idx = rcu_ctrlblk.completed & 0x1;
+ smp_read_barrier_depends();
+ barrier();
+ __get_cpu_var(rcu_flipctr)[idx]++;
+ barrier();
+
+ /*
+ * Now that the per-CPU counter has been incremented, we
+ * are protected. We can therefore safely increment
+ * the nesting counter, relieving further NMIs of the
+ * need to do so.
+ */
+ current->rcu_read_lock_nesting = nesting + 1;
+ barrier();
+
+ /*
+ * Now that we have prevented any NMIs from storing
+ * to the ->rcu_flipctr_idx, we can safely use it to
+ * remember which counter to decrement in the matching
+ * rcu_read_unlock().
+ */
+ current->rcu_flipctr_idx = idx;
+ local_irq_restore(oldirq);
+ }
+}
+
+void __rcu_read_unlock(void)
+{
+ int idx;
+ int nesting;
+
+ nesting = current->rcu_read_lock_nesting;
+ if (nesting > 1) {
+ /*
+ * We are still protected by an enclosing rcu_read_lock(),
+ * so simply decrement the counter.
+ */
+ current->rcu_read_lock_nesting = nesting - 1;
+
+ } else {
+ unsigned long oldirq;
+
+ /*
+ * Disable local interrupts to prevent the grace-period
+ * detection state machine from seeing us half-done.
+ */
+ local_irq_save(oldirq);
+
+ /*
+ * Outermost nesting of rcu_read_unlock(), so we must
+ * decrement the current counter for the current CPU.
+ * This must be done carefully, because NMIs can
+ * occur at any point in this code, and any rcu_read_lock()
+ * and rcu_read_unlock() pairs in the NMI handlers
+ * must interact non-destructively with this code.
+ * Lots of barrier() calls, and -very- careful ordering.
+ *
+ * Changes to this code, including this one, must be
+ * inspected, validated, and tested extremely carefully!!!
+ */
+
+ /*
+ * First, pick up the index. Enforce ordering for
+ * both compilers and for DEC Alpha.
+ */
+ idx = current->rcu_flipctr_idx;
+ smp_read_barrier_depends();
+ barrier();
+
+ /*
+ * It is now safe to decrement the task's nesting count.
+ * NMIs that occur after this statement will route
+ * their rcu_read_lock() calls through this "else" clause
+ * of this "if" statement, and thus will start incrementing
+ * the per-CPU counter on their own. Enforce ordering for
+ * compilers.
+ */
+ current->rcu_read_lock_nesting = nesting - 1;
+ barrier();
+
+ /*
+ * Decrement the per-CPU counter. NMI handlers
+ * might increment it as well, but they had better
+ * properly nest their rcu_read_lock()/rcu_read_unlock()
+ * pairs so that the value is restored before the handler
+ * returns to us.
+ */
+ __get_cpu_var(rcu_flipctr)[idx]--;
+ local_irq_restore(oldirq);
+ }
+}
+
+static void __rcu_advance_callbacks(void)
+{
+ if ((rcu_data.completed >> 1) != (rcu_ctrlblk.completed >> 1)) {
+ if (rcu_data.waitlist != NULL) {
+ *rcu_data.donetail = rcu_data.waitlist;
+ rcu_data.donetail = rcu_data.waittail;
+ }
+ if (rcu_data.nextlist != NULL) {
+ rcu_data.waitlist = rcu_data.nextlist;
+ rcu_data.waittail = rcu_data.nexttail;
+ rcu_data.nextlist = NULL;
+ rcu_data.nexttail = &rcu_data.nextlist;
+ } else {
+ rcu_data.waitlist = NULL;
+ rcu_data.waittail = &rcu_data.waitlist;
+ }
+ rcu_data.completed = rcu_ctrlblk.completed;
+ } else if (rcu_data.completed != rcu_ctrlblk.completed)
+ rcu_data.completed = rcu_ctrlblk.completed;
+}
+
+/*
+ * Get here when RCU is idle. Decide whether we need to
+ * move out of idle state, and return zero if so.
+ * "Straightforward" approach for the moment, might later
+ * use callback-list lengths, grace-period duration, or
+ * some such to determine when to exit idle state.
+ * Might also need a pre-idle test that does not acquire
+ * the lock, but let's get the simple case working first...
+ */
+static int rcu_try_flip_idle(int flipctr)
+{
+ if (!rcu_pending(smp_processor_id()))
+ return 1;
+ return 0;
+}
+
+/*
+ * Flip processing up to and including the flip, as well as
+ * telling CPUs to acknowledge the flip.
+ */
+static int rcu_try_flip_in_gp(int flipctr)
+{
+ int cpu;
+
+ /*
+ * Do the flip.
+ */
+ rcu_ctrlblk.completed++; /* stands in for rcu_try_flip_g2 */
+
+ /*
+ * Need a memory barrier so that other CPUs see the new
+ * counter value before they see the subsequent change of all
+ * the rcu_flip_flag instances to RCU_FLIPPED.
+ */
+ smp_mb();
+
+ /* Now ask each CPU for acknowledgement of the flip. */
+
+ for_each_possible_cpu(cpu)
+ per_cpu(rcu_flip_flag, cpu) = RCU_FLIPPED;
+
+ return 0;
+}
+
+/*
+ * Wait for CPUs to acknowledge the flip.
+ */
+static int rcu_try_flip_waitack(int flipctr)
+{
+ int cpu;
+
+ for_each_possible_cpu(cpu)
+ if (per_cpu(rcu_flip_flag, cpu) != RCU_FLIP_SEEN)
+ return 1;
+
+ /*
+ * Make sure our checks above don't bleed into subsequent
+ * waiting for the sum of the counters to reach zero.
+ */
+ smp_mb();
+ return 0;
+}
+
+/*
+ * Wait for collective ``last'' counter to reach zero,
+ * then tell all CPUs to do an end-of-grace-period memory barrier.
+ */
+static int rcu_try_flip_waitzero(int flipctr)
+{
+ int cpu;
+ int lastidx = !(flipctr & 0x1);
+ int sum = 0;
+
+ /* Check to see if the sum of the "last" counters is zero. */
+
+ for_each_possible_cpu(cpu)
+ sum += per_cpu(rcu_flipctr, cpu)[lastidx];
+ if (sum != 0)
+ return 1;
+
+ /* Make sure we don't call for memory barriers before we see zero. */
+ smp_mb();
+
+ /* Call for a memory barrier from each CPU. */
+ for_each_possible_cpu(cpu)
+ per_cpu(rcu_mb_flag, cpu) = RCU_MB_NEEDED;
+
+ return 0;
+}
+
+/*
+ * Wait for all CPUs to do their end-of-grace-period memory barrier.
+ * Return 0 once all CPUs have done so.
+ */
+static int rcu_try_flip_waitmb(int flipctr)
+{
+ int cpu;
+
+ for_each_possible_cpu(cpu)
+ if (per_cpu(rcu_mb_flag, cpu) != RCU_MB_DONE)
+ return 1;
+
+ smp_mb(); /* Ensure that the above checks precede any following flip. */
+ return 0;
+}
+
+/*
+ * Attempt a single flip of the counters. Remember, a single flip does
+ * -not- constitute a grace period. Instead, the interval between
+ * at least three consecutive flips is a grace period.
+ *
+ * If anyone is nuts enough to run this CONFIG_PREEMPT_RCU implementation
+ * on a large SMP, they might want to use a hierarchical organization of
+ * the per-CPU-counter pairs.
+ */
+static void rcu_try_flip(void)
+{
+ long flipctr;
+ unsigned long oldirq;
+
+ if (unlikely(!spin_trylock_irqsave(&rcu_ctrlblk.fliplock, oldirq)))
+ return;
+
+ /*
+ * Take the next transition(s) through the RCU grace-period
+ * flip-counter state machine.
+ */
+ flipctr = rcu_ctrlblk.completed;
+ switch (rcu_try_flip_state) {
+ case RCU_TRY_FLIP_IDLE:
+ if (rcu_try_flip_idle(flipctr))
+ break;
+ rcu_try_flip_state = RCU_TRY_FLIP_GP;
+ case RCU_TRY_FLIP_GP:
+ if (rcu_try_flip_in_gp(flipctr))
+ break;
+ rcu_try_flip_state = RCU_TRY_FLIP_WAITACK;
+ case RCU_TRY_FLIP_WAITACK:
+ if (rcu_try_flip_waitack(flipctr))
+ break;
+ rcu_try_flip_state = RCU_TRY_FLIP_WAITZERO;
+ case RCU_TRY_FLIP_WAITZERO:
+ if (rcu_try_flip_waitzero(flipctr))
+ break;
+ rcu_try_flip_state = RCU_TRY_FLIP_WAITMB;
+ case RCU_TRY_FLIP_WAITMB:
+ if (rcu_try_flip_waitmb(flipctr))
+ break;
+ rcu_try_flip_state = RCU_TRY_FLIP_IDLE;
+ }
+ spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
+}
+
+/*
+ * Check to see if this CPU needs to report that it has seen the most
+ * recent counter flip, thereby declaring that all subsequent
+ * rcu_read_lock() invocations will respect this flip.
+ */
+static void rcu_check_flipseen(int cpu)
+{
+ if (per_cpu(rcu_flip_flag, cpu) == RCU_FLIPPED) {
+ smp_mb(); /* Subsequent counter acccesses must see new value */
+ per_cpu(rcu_flip_flag, cpu) = RCU_FLIP_SEEN;
+ smp_mb(); /* probably be implied by interrupt, but... */
+ }
+}
+
+/*
+ * Check to see if this CPU needs to do a memory barrier in order to
+ * ensure that any prior RCU read-side critical sections have committed
+ * their counter manipulations and critical-section memory references
+ * before declaring the grace period to be completed.
+ */
+static void rcu_check_mb(int cpu)
+{
+ if (per_cpu(rcu_mb_flag, cpu) == RCU_MB_NEEDED) {
+ smp_mb();
+ per_cpu(rcu_mb_flag, cpu) = RCU_MB_DONE;
+ }
+}
+
+/*
+ * This function is periodically called from hardware-irq context on
+ * each CPU.
+ */
+void rcu_check_callbacks(int cpu, int user)
+{
+ unsigned long oldirq;
+
+ rcu_check_flipseen(cpu);
+ rcu_check_mb(cpu);
+ if (rcu_ctrlblk.completed == rcu_data.completed) {
+ rcu_try_flip();
+ if (rcu_ctrlblk.completed == rcu_data.completed) {
+ return;
+ }
+ }
+ spin_lock_irqsave(&rcu_data.lock, oldirq);
+ __rcu_advance_callbacks();
+ if (rcu_data.donelist == NULL)
+ spin_unlock_irqrestore(&rcu_data.lock, oldirq);
+ else {
+ spin_unlock_irqrestore(&rcu_data.lock, oldirq);
+ raise_softirq(RCU_SOFTIRQ);
+ }
+}
+
+/*
+ * Check to see if any future RCU-related work will need to be done
+ * by the current CPU, even if none need be done immediately, returning
+ * 1 if so. This function is part of the RCU implementation; it is -not-
+ * an exported member of the RCU API.
+ */
+int rcu_needs_cpu(int cpu)
+{
+ return rcu_pending(cpu);
+}
+
+/*
+ * Needed by dynticks, to make sure all RCU processing has finished
+ * when we go idle:
+ */
+void rcu_advance_callbacks(int cpu, int user)
+{
+ unsigned long oldirq;
+
+ if (rcu_ctrlblk.completed == rcu_data.completed) {
+ rcu_try_flip();
+ if (rcu_ctrlblk.completed == rcu_data.completed) {
+ return;
+ }
+ }
+ spin_lock_irqsave(&rcu_data.lock, oldirq);
+ __rcu_advance_callbacks();
+ spin_unlock_irqrestore(&rcu_data.lock, oldirq);
+}
+
+static void rcu_process_callbacks(struct softirq_action *unused)
+{
+ unsigned long flags;
+ struct rcu_head *next, *list;
+
+ spin_lock_irqsave(&rcu_data.lock, flags);
+ list = rcu_data.donelist;
+ if (list == NULL) {
+ spin_unlock_irqrestore(&rcu_data.lock, flags);
+ return;
+ }
+ rcu_data.donelist = NULL;
+ rcu_data.donetail = &rcu_data.donelist;
+ spin_unlock_irqrestore(&rcu_data.lock, flags);
+ while (list) {
+ next = list->next;
+ list->func(list);
+ list = next;
+ }
+}
+
+void fastcall call_rcu(struct rcu_head *head,
+ void (*func)(struct rcu_head *rcu))
+{
+ unsigned long flags;
+
+ head->func = func;
+ head->next = NULL;
+ spin_lock_irqsave(&rcu_data.lock, flags);
+ __rcu_advance_callbacks();
+ *rcu_data.nexttail = head;
+ rcu_data.nexttail = &head->next;
+ spin_unlock_irqrestore(&rcu_data.lock, flags);
+}
+
+/*
+ * Wait until all currently running preempt_disable() code segments
+ * (including hardware-irq-disable segments) complete. Note that
+ * in -rt this does -not- necessarily result in all currently executing
+ * interrupt -handlers- having completed.
+ */
+void __synchronize_sched(void)
+{
+ cpumask_t oldmask;
+ int cpu;
+
+ if (sched_getaffinity(0, &oldmask) < 0) {
+ oldmask = cpu_possible_map;
+ }
+ for_each_online_cpu(cpu) {
+ sched_setaffinity(0, cpumask_of_cpu(cpu));
+ schedule();
+ }
+ sched_setaffinity(0, oldmask);
+}
+
+int rcu_pending(int cpu)
+{
+ return (rcu_data.donelist != NULL ||
+ rcu_data.waitlist != NULL ||
+ rcu_data.nextlist != NULL);
+}
+
+void __init __rcu_init(void)
+{
+ spin_lock_init(&rcu_data.lock);
+ rcu_data.completed = 0;
+ rcu_data.nextlist = NULL;
+ rcu_data.nexttail = &rcu_data.nextlist;
+ rcu_data.waitlist = NULL;
+ rcu_data.waittail = &rcu_data.waitlist;
+ rcu_data.donelist = NULL;
+ rcu_data.donetail = &rcu_data.donelist;
+ open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
+}
+
+/*
+ * Deprecated, use synchronize_rcu() or synchronize_sched() instead.
+ */
+void synchronize_kernel(void)
+{
+ synchronize_rcu();
+}
+
+
+EXPORT_SYMBOL_GPL(call_rcu);
+EXPORT_SYMBOL_GPL(rcu_batches_completed);
+EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
+EXPORT_SYMBOL_GPL(__synchronize_sched);
+EXPORT_SYMBOL_GPL(__rcu_read_lock);
+EXPORT_SYMBOL_GPL(__rcu_read_unlock);
+

_

2007-01-15 19:30:51

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [mm PATCH 5/6] RCU: debug trace for RCU


This patch consolidates the RCU tracing code in the preemptible
RCU implementation, moves them to a separate "trace" file and
cleans up the #ifdefs. Moving to a separate file will eventually
allow dynamic tracing of RCU implementation.

Signed-off-by: Paul McKenney <[email protected]>
Signed-off-by: Dipankar Sarma <[email protected]>
---



diff -puN include/linux/rcupreempt.h~rcu-preempt-trace include/linux/rcupreempt.h
--- linux-2.6.20-rc3-mm1-rcu/include/linux/rcupreempt.h~rcu-preempt-trace 2007-01-15 15:36:56.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/rcupreempt.h 2007-01-15 15:36:56.000000000 +0530
@@ -61,5 +61,15 @@ extern void __rcu_init(void);
extern void rcu_check_callbacks(int cpu, int user);
extern void rcu_restart_cpu(int cpu);

+#ifdef CONFIG_RCU_TRACE
+struct rcupreempt_trace;
+extern int *rcupreempt_flipctr(int cpu);
+extern long rcupreempt_data_completed(void);
+extern int rcupreempt_flip_flag(int cpu);
+extern int rcupreempt_mb_flag(int cpu);
+extern char *rcupreempt_try_flip_state_name(void);
+extern struct rcupreempt_trace *rcupreempt_trace(void);
+#endif
+
#endif /* __KERNEL__ */
#endif /* __LINUX_RCUPREEMPT_H */
diff -puN /dev/null include/linux/rcupreempt_trace.h
--- /dev/null 2006-03-26 18:34:52.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/rcupreempt_trace.h 2007-01-15 15:36:56.000000000 +0530
@@ -0,0 +1,102 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion (RT implementation)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright IBM Corporation, 2006
+ *
+ * Author: Paul McKenney <[email protected]>
+ *
+ * Based on the original work by Paul McKenney <[email protected]>
+ * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
+ * Papers:
+ * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
+ * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * http://lse.sourceforge.net/locking/rcupdate.html
+ *
+ */
+
+#ifndef __LINUX_RCUPREEMPT_TRACE_H
+#define __LINUX_RCUPREEMPT_TRACE_H
+
+#ifdef __KERNEL__
+#include <linux/types.h>
+#include <linux/kernel.h>
+
+#include <asm/atomic.h>
+
+/*
+ * PREEMPT_RCU data structures.
+ */
+
+struct rcupreempt_trace {
+ long next_length;
+ long next_add;
+ long wait_length;
+ long wait_add;
+ long done_length;
+ long done_add;
+ long done_remove;
+ atomic_t done_invoked;
+ long check_callbacks;
+ atomic_t try_flip1;
+ long try_flip2;
+ long try_flip3;
+ atomic_t try_flip_e1;
+ long try_flip_i1;
+ long try_flip_ie1;
+ long try_flip_g1;
+ long try_flip_a1;
+ long try_flip_ae1;
+ long try_flip_a2;
+ long try_flip_z1;
+ long try_flip_ze1;
+ long try_flip_z2;
+ long try_flip_m1;
+ long try_flip_me1;
+ long try_flip_m2;
+};
+
+#ifdef CONFIG_RCU_TRACE
+#define RCU_TRACE(fn, arg) fn(arg);
+#else
+#define RCU_TRACE(fn, arg)
+#endif
+
+extern void rcupreempt_trace_move2done(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_move2wait(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_e1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_i1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_ie1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_g1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_a1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_ae1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_a2(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_z1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_ze1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_z2(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_m1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_me1(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_try_flip_m2(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_check_callbacks(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_done_remove(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_invoke(struct rcupreempt_trace *trace);
+extern void rcupreempt_trace_next_add(struct rcupreempt_trace *trace);
+
+#endif /* __KERNEL__ */
+#endif /* __LINUX_RCUPREEMPT_TRACE_H */
diff -puN kernel/Kconfig.preempt~rcu-preempt-trace kernel/Kconfig.preempt
--- linux-2.6.20-rc3-mm1-rcu/kernel/Kconfig.preempt~rcu-preempt-trace 2007-01-15 15:36:56.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/Kconfig.preempt 2007-01-15 15:36:56.000000000 +0530
@@ -89,3 +89,14 @@ config PREEMPT_RCU
Say N if you are unsure.

endchoice
+
+config RCU_TRACE
+ bool "Enable tracing for RCU - currently stats in debugfs"
+ select DEBUG_FS
+ default y
+ help
+ This option provides tracing in RCU which presents stats
+ in debugfs for debugging RCU implementation.
+
+ Say Y here if you want to enable RCU tracing
+ Say N if you are unsure.
diff -puN kernel/Makefile~rcu-preempt-trace kernel/Makefile
--- linux-2.6.20-rc3-mm1-rcu/kernel/Makefile~rcu-preempt-trace 2007-01-15 15:36:56.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/Makefile 2007-01-15 15:36:56.000000000 +0530
@@ -48,6 +48,9 @@ obj-$(CONFIG_SECCOMP) += seccomp.o
obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o
obj-$(CONFIG_CLASSIC_RCU) += rcuclassic.o
obj-$(CONFIG_PREEMPT_RCU) += rcupreempt.o
+ifeq ($(CONFIG_PREEMPT_RCU),y)
+obj-$(CONFIG_RCU_TRACE) += rcupreempt_trace.o
+endif
obj-$(CONFIG_DEBUG_SYNCHRO_TEST) += synchro-test.o
obj-$(CONFIG_RELAY) += relay.o
obj-$(CONFIG_UTS_NS) += utsname.o
diff -puN kernel/rcupreempt.c~rcu-preempt-trace kernel/rcupreempt.c
--- linux-2.6.20-rc3-mm1-rcu/kernel/rcupreempt.c~rcu-preempt-trace 2007-01-15 15:36:56.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcupreempt.c 2007-01-15 15:36:56.000000000 +0530
@@ -50,6 +50,7 @@
#include <linux/delay.h>
#include <linux/byteorder/swabb.h>
#include <linux/cpumask.h>
+#include <linux/rcupreempt_trace.h>

/*
* PREEMPT_RCU data structures.
@@ -64,6 +65,9 @@ struct rcu_data {
struct rcu_head **waittail;
struct rcu_head *donelist;
struct rcu_head **donetail;
+#ifdef CONFIG_RCU_TRACE
+ struct rcupreempt_trace trace;
+#endif /* #ifdef CONFIG_RCU_TRACE */
};
struct rcu_ctrlblk {
spinlock_t fliplock;
@@ -87,6 +91,10 @@ enum rcu_try_flip_state {
RCU_TRY_FLIP_WAITMB /* "M" */
};
static enum rcu_try_flip_state rcu_try_flip_state = RCU_TRY_FLIP_IDLE;
+#ifdef CONFIG_RCU_TRACE
+static char *rcu_try_flip_state_names[] =
+ { "idle", "gp", "waitack", "waitzero", "waitmb" };
+#endif /* #ifdef CONFIG_RCU_TRACE */

/*
* Enum and per-CPU flag to determine when each CPU has seen
@@ -249,12 +257,14 @@ static void __rcu_advance_callbacks(void
if (rcu_data.waitlist != NULL) {
*rcu_data.donetail = rcu_data.waitlist;
rcu_data.donetail = rcu_data.waittail;
+ RCU_TRACE(rcupreempt_trace_move2done, &rcu_data.trace);
}
if (rcu_data.nextlist != NULL) {
rcu_data.waitlist = rcu_data.nextlist;
rcu_data.waittail = rcu_data.nexttail;
rcu_data.nextlist = NULL;
rcu_data.nexttail = &rcu_data.nextlist;
+ RCU_TRACE(rcupreempt_trace_move2wait, &rcu_data.trace);
} else {
rcu_data.waitlist = NULL;
rcu_data.waittail = &rcu_data.waitlist;
@@ -275,8 +285,11 @@ static void __rcu_advance_callbacks(void
*/
static int rcu_try_flip_idle(int flipctr)
{
- if (!rcu_pending(smp_processor_id()))
- return 1;
+ RCU_TRACE(rcupreempt_trace_try_flip_i1, &rcu_data.trace);
+ if (!rcu_pending(smp_processor_id())) {
+ RCU_TRACE(rcupreempt_trace_try_flip_ie1, &rcu_data.trace);
+ return 1;
+ }
return 0;
}

@@ -288,6 +301,7 @@ static int rcu_try_flip_in_gp(int flipct
{
int cpu;

+ RCU_TRACE(rcupreempt_trace_try_flip_g1, &rcu_data.trace);
/*
* Do the flip.
*/
@@ -315,15 +329,19 @@ static int rcu_try_flip_waitack(int flip
{
int cpu;

+ RCU_TRACE(rcupreempt_trace_try_flip_a1, &rcu_data.trace);
for_each_possible_cpu(cpu)
- if (per_cpu(rcu_flip_flag, cpu) != RCU_FLIP_SEEN)
- return 1;
-
+ if (per_cpu(rcu_flip_flag, cpu) != RCU_FLIP_SEEN) {
+ RCU_TRACE(rcupreempt_trace_try_flip_ae1,
+ &rcu_data.trace);
+ return 1;
+ }
/*
* Make sure our checks above don't bleed into subsequent
* waiting for the sum of the counters to reach zero.
*/
smp_mb();
+ RCU_TRACE(rcupreempt_trace_try_flip_a2, &rcu_data.trace);
return 0;
}

@@ -337,13 +355,17 @@ static int rcu_try_flip_waitzero(int fli
int lastidx = !(flipctr & 0x1);
int sum = 0;

+ RCU_TRACE(rcupreempt_trace_try_flip_z1, &rcu_data.trace);
/* Check to see if the sum of the "last" counters is zero. */

for_each_possible_cpu(cpu)
sum += per_cpu(rcu_flipctr, cpu)[lastidx];
- if (sum != 0)
+ if (sum != 0) {
+ RCU_TRACE(rcupreempt_trace_try_flip_ze1, &rcu_data.trace);
return 1;
+ }

+ RCU_TRACE(rcupreempt_trace_try_flip_z2, &rcu_data.trace);
/* Make sure we don't call for memory barriers before we see zero. */
smp_mb();

@@ -362,11 +384,16 @@ static int rcu_try_flip_waitmb(int flipc
{
int cpu;

+ RCU_TRACE(rcupreempt_trace_try_flip_m1, &rcu_data.trace);
for_each_possible_cpu(cpu)
- if (per_cpu(rcu_mb_flag, cpu) != RCU_MB_DONE)
+ if (per_cpu(rcu_mb_flag, cpu) != RCU_MB_DONE) {
+ RCU_TRACE(rcupreempt_trace_try_flip_me1,
+ &rcu_data.trace);
return 1;
+ }

smp_mb(); /* Ensure that the above checks precede any following flip. */
+ RCU_TRACE(rcupreempt_trace_try_flip_m2, &rcu_data.trace);
return 0;
}

@@ -384,8 +411,11 @@ static void rcu_try_flip(void)
long flipctr;
unsigned long oldirq;

- if (unlikely(!spin_trylock_irqsave(&rcu_ctrlblk.fliplock, oldirq)))
+ RCU_TRACE(rcupreempt_trace_try_flip1, &rcu_data.trace);
+ if (unlikely(!spin_trylock_irqsave(&rcu_ctrlblk.fliplock, oldirq))) {
+ RCU_TRACE(rcupreempt_trace_try_flip_e1, &rcu_data.trace);
return;
+ }

/*
* Take the next transition(s) through the RCU grace-period
@@ -462,6 +492,7 @@ void rcu_check_callbacks(int cpu, int us
}
}
spin_lock_irqsave(&rcu_data.lock, oldirq);
+ RCU_TRACE(rcupreempt_trace_check_callbacks, &rcu_data.trace);
__rcu_advance_callbacks();
if (rcu_data.donelist == NULL)
spin_unlock_irqrestore(&rcu_data.lock, oldirq);
@@ -514,11 +545,13 @@ static void rcu_process_callbacks(struct
}
rcu_data.donelist = NULL;
rcu_data.donetail = &rcu_data.donelist;
+ RCU_TRACE(rcupreempt_trace_done_remove, &rcu_data.trace);
spin_unlock_irqrestore(&rcu_data.lock, flags);
while (list) {
next = list->next;
list->func(list);
list = next;
+ RCU_TRACE(rcupreempt_trace_invoke, &rcu_data.trace);
}
}

@@ -533,6 +566,7 @@ void fastcall call_rcu(struct rcu_head *
__rcu_advance_callbacks();
*rcu_data.nexttail = head;
rcu_data.nexttail = &head->next;
+ RCU_TRACE(rcupreempt_trace_next_add, &rcu_data.trace);
spin_unlock_irqrestore(&rcu_data.lock, flags);
}

@@ -585,6 +619,37 @@ void synchronize_kernel(void)
synchronize_rcu();
}

+#ifdef RCU_TRACE
+int *rcupreempt_flipctr(int cpu)
+{
+ return &per_cpu(rcu_flipctr, cpu)[0];
+}
+long rcupreempt_data_completed(void)
+{
+ return rcu_data.completed;
+}
+int rcupreempt_flip_flag(int cpu)
+{
+ return per_cpu(rcu_flip_flag, cpu);
+}
+int rcupreempt_mb_flag(int cpu)
+{
+ return per_cpu(rcu_mb_flag, cpu);
+}
+char *rcupreempt_try_flip_state_name(void)
+{
+ return rcu_try_flip_state_names[rcu_try_flip_state];
+}
+struct rcupreempt_trace *rcupreempt_trace(void)
+{
+ return &rcu_data.trace;
+}
+EXPORT_SYMBOL_GPL(rcupreempt_flipctr);
+EXPORT_SYMBOL_GPL(rcupreempt_data_completed);
+EXPORT_SYMBOL_GPL(rcupreempt_flip_flag);
+EXPORT_SYMBOL_GPL(rcupreempt_mb_flag);
+EXPORT_SYMBOL_GPL(rcupreempt_try_flip_state_name);
+#endif /* #ifdef RCU_TRACE */

EXPORT_SYMBOL_GPL(call_rcu);
EXPORT_SYMBOL_GPL(rcu_batches_completed);
diff -puN /dev/null kernel/rcupreempt_trace.c
--- /dev/null 2006-03-26 18:34:52.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcupreempt_trace.c 2007-01-15 15:36:56.000000000 +0530
@@ -0,0 +1,308 @@
+/*
+ * Read-Copy Update tracing for realtime implementation
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright IBM Corporation, 2006
+ *
+ * Papers: http://www.rdrop.com/users/paulmck/RCU
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * Documentation/RCU/ *.txt
+ *
+ */
+#include <linux/types.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/spinlock.h>
+#include <linux/smp.h>
+#include <linux/rcupdate.h>
+#include <linux/interrupt.h>
+#include <linux/sched.h>
+#include <asm/atomic.h>
+#include <linux/bitops.h>
+#include <linux/module.h>
+#include <linux/completion.h>
+#include <linux/moduleparam.h>
+#include <linux/percpu.h>
+#include <linux/notifier.h>
+#include <linux/rcupdate.h>
+#include <linux/cpu.h>
+#include <linux/mutex.h>
+#include <linux/rcupreempt_trace.h>
+#include <linux/debugfs.h>
+
+static struct mutex rcupreempt_trace_mutex;
+static char *rcupreempt_trace_buf;
+#define RCUPREEMPT_TRACE_BUF_SIZE 4096
+
+void rcupreempt_trace_move2done(struct rcupreempt_trace *trace)
+{
+ trace->done_length += trace->wait_length;
+ trace->done_add += trace->wait_length;
+ trace->wait_length = 0;
+}
+void rcupreempt_trace_move2wait(struct rcupreempt_trace *trace)
+{
+ trace->wait_length += trace->next_length;
+ trace->wait_add += trace->next_length;
+ trace->next_length = 0;
+}
+void rcupreempt_trace_try_flip1(struct rcupreempt_trace *trace)
+{
+ atomic_inc(&trace->try_flip1);
+}
+void rcupreempt_trace_try_flip_e1(struct rcupreempt_trace *trace)
+{
+ atomic_inc(&trace->try_flip_e1);
+}
+void rcupreempt_trace_try_flip2(struct rcupreempt_trace *trace)
+{
+ trace->try_flip2++;
+}
+void rcupreempt_trace_try_flip3(struct rcupreempt_trace *trace)
+{
+ trace->try_flip3++;
+}
+void rcupreempt_trace_try_flip_i1(struct rcupreempt_trace *trace)
+{
+ trace->try_flip_i1++;
+}
+void rcupreempt_trace_try_flip_ie1(struct rcupreempt_trace *trace)
+{
+ trace->try_flip_ie1++;
+}
+void rcupreempt_trace_try_flip_g1(struct rcupreempt_trace *trace)
+{
+ trace->try_flip_g1++;
+}
+void rcupreempt_trace_try_flip_a1(struct rcupreempt_trace *trace)
+{
+ trace->try_flip_a1++;
+}
+void rcupreempt_trace_try_flip_ae1(struct rcupreempt_trace *trace)
+{
+ trace->try_flip_ae1++;
+}
+void rcupreempt_trace_try_flip_a2(struct rcupreempt_trace *trace)
+{
+ trace->try_flip_a2++;
+}
+void rcupreempt_trace_try_flip_z1(struct rcupreempt_trace *trace)
+{
+ trace->try_flip_z1++;
+}
+void rcupreempt_trace_try_flip_ze1(struct rcupreempt_trace *trace)
+{
+ trace->try_flip_ze1++;
+}
+void rcupreempt_trace_try_flip_z2(struct rcupreempt_trace *trace)
+{
+ trace->try_flip_z2++;
+}
+void rcupreempt_trace_try_flip_m1(struct rcupreempt_trace *trace)
+{
+ trace->try_flip_m1++;
+}
+void rcupreempt_trace_try_flip_me1(struct rcupreempt_trace *trace)
+{
+ trace->try_flip_me1++;
+}
+void rcupreempt_trace_try_flip_m2(struct rcupreempt_trace *trace)
+{
+ trace->try_flip_m2++;
+}
+void rcupreempt_trace_check_callbacks(struct rcupreempt_trace *trace)
+{
+ trace->check_callbacks++;
+}
+void rcupreempt_trace_done_remove(struct rcupreempt_trace *trace)
+{
+ trace->done_remove += trace->done_length;
+ trace->done_length = 0;
+}
+void rcupreempt_trace_invoke(struct rcupreempt_trace *trace)
+{
+ atomic_inc(&trace->done_invoked);
+}
+void rcupreempt_trace_next_add(struct rcupreempt_trace *trace)
+{
+ trace->next_add++;
+ trace->next_length++;
+}
+
+static ssize_t rcustats_read(struct file *filp, char __user *buffer,
+ size_t count, loff_t *ppos)
+{
+ struct rcupreempt_trace *trace = rcupreempt_trace();
+ ssize_t bcount;
+
+ mutex_lock(&rcupreempt_trace_mutex);
+ snprintf(rcupreempt_trace_buf, RCUPREEMPT_TRACE_BUF_SIZE,
+ "ggp=%ld lgp=%ld rcc=%ld\n"
+ "na=%ld nl=%ld wa=%ld wl=%ld da=%ld dl=%ld dr=%ld di=%d\n"
+ "1=%d e1=%d i1=%ld ie1=%ld g1=%ld a1=%ld ae1=%ld a2=%ld\n"
+ "z1=%ld ze1=%ld z2=%ld m1=%ld me1=%ld m2=%ld\n",
+
+ rcu_batches_completed(),
+ rcupreempt_data_completed(),
+ trace->check_callbacks,
+ trace->next_add,
+ trace->next_length,
+ trace->wait_add,
+ trace->wait_length,
+ trace->done_add,
+ trace->done_length,
+ trace->done_remove,
+ atomic_read(&trace->done_invoked),
+ atomic_read(&trace->try_flip1),
+ atomic_read(&trace->try_flip_e1),
+ trace->try_flip_i1,
+ trace->try_flip_ie1,
+ trace->try_flip_g1,
+ trace->try_flip_a1,
+ trace->try_flip_ae1,
+ trace->try_flip_a2,
+ trace->try_flip_z1,
+ trace->try_flip_ze1,
+ trace->try_flip_z2,
+ trace->try_flip_m1,
+ trace->try_flip_me1,
+ trace->try_flip_m2);
+ bcount = simple_read_from_buffer(buffer, count, ppos,
+ rcupreempt_trace_buf, strlen(rcupreempt_trace_buf));
+ mutex_unlock(&rcupreempt_trace_mutex);
+ return bcount;
+}
+
+static ssize_t rcugp_read(struct file *filp, char __user *buffer,
+ size_t count, loff_t *ppos)
+{
+ long oldgp = rcu_batches_completed();
+ ssize_t bcount;
+
+ mutex_lock(&rcupreempt_trace_mutex);
+ synchronize_rcu();
+ snprintf(rcupreempt_trace_buf, RCUPREEMPT_TRACE_BUF_SIZE,
+ "oldggp=%ld newggp=%ld\n", oldgp, rcu_batches_completed());
+ bcount = simple_read_from_buffer(buffer, count, ppos,
+ rcupreempt_trace_buf, strlen(rcupreempt_trace_buf));
+ mutex_unlock(&rcupreempt_trace_mutex);
+ return bcount;
+}
+
+static ssize_t rcuctrs_read(struct file *filp, char __user *buffer,
+ size_t count, loff_t *ppos)
+{
+ int cnt = 0;
+ int cpu;
+ int f = rcu_batches_completed() & 0x1;
+ ssize_t bcount;
+
+ mutex_lock(&rcupreempt_trace_mutex);
+
+ cnt += snprintf(&rcupreempt_trace_buf[cnt], RCUPREEMPT_TRACE_BUF_SIZE,
+ "CPU last cur F M\n");
+ for_each_online_cpu(cpu) {
+ int *flipctr = rcupreempt_flipctr(cpu);
+ cnt += snprintf(&rcupreempt_trace_buf[cnt],
+ RCUPREEMPT_TRACE_BUF_SIZE - cnt,
+ "%3d %4d %3d %d %d\n",
+ cpu,
+ flipctr[!f],
+ flipctr[f],
+ rcupreempt_flip_flag(cpu),
+ rcupreempt_mb_flag(cpu));
+ }
+ cnt += snprintf(&rcupreempt_trace_buf[cnt],
+ RCUPREEMPT_TRACE_BUF_SIZE - cnt,
+ "ggp = %ld, state = %s\n",
+ rcupreempt_data_completed(),
+ rcupreempt_try_flip_state_name());
+ cnt += snprintf(&rcupreempt_trace_buf[cnt],
+ RCUPREEMPT_TRACE_BUF_SIZE - cnt,
+ "\n");
+ bcount = simple_read_from_buffer(buffer, count, ppos,
+ rcupreempt_trace_buf, strlen(rcupreempt_trace_buf));
+ mutex_unlock(&rcupreempt_trace_mutex);
+ return bcount;
+}
+
+static struct file_operations rcustats_fops = {
+ .owner = THIS_MODULE,
+ .read = rcustats_read,
+};
+
+static struct file_operations rcugp_fops = {
+ .owner = THIS_MODULE,
+ .read = rcugp_read,
+};
+
+static struct file_operations rcuctrs_fops = {
+ .owner = THIS_MODULE,
+ .read = rcuctrs_read,
+};
+
+static struct dentry *rcudir, *statdir, *ctrsdir, *gpdir;
+static int rcupreempt_debugfs_init(void)
+{
+ rcudir = debugfs_create_dir("rcu", NULL);
+ if (!rcudir)
+ goto out;
+ statdir = debugfs_create_file("rcustats", 0444, rcudir,
+ NULL, &rcustats_fops);
+ if (!statdir)
+ goto free_out;
+
+ gpdir = debugfs_create_file("rcugp", 0444, rcudir, NULL, &rcugp_fops);
+ if (!gpdir)
+ goto free_out;
+
+ ctrsdir = debugfs_create_file("rcuctrs", 0444, rcudir,
+ NULL, &rcuctrs_fops);
+ if (!ctrsdir)
+ goto free_out;
+ return 0;
+free_out:
+ if (statdir)
+ debugfs_remove(statdir);
+ if (gpdir)
+ debugfs_remove(gpdir);
+ debugfs_remove(rcudir);
+out:
+ return 1;
+}
+
+static int __init rcupreempt_trace_init(void)
+{
+ mutex_init(&rcupreempt_trace_mutex);
+ rcupreempt_trace_buf = kmalloc(RCUPREEMPT_TRACE_BUF_SIZE, GFP_KERNEL);
+ if (!rcupreempt_trace_buf)
+ return 1;
+ return rcupreempt_debugfs_init();
+}
+
+static void __exit rcupreempt_trace_cleanup(void)
+{
+ debugfs_remove(statdir);
+ debugfs_remove(gpdir);
+ debugfs_remove(ctrsdir);
+ debugfs_remove(rcudir);
+ kfree(rcupreempt_trace_buf);
+}
+
+
+module_init(rcupreempt_trace_init);
+module_exit(rcupreempt_trace_cleanup);

_

2007-01-15 19:31:45

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [mm PATCH 6/6] RCU: trivial fixes


Fix a few trivial things based on review comments.

Signed-off-by: Dipankar Sarma <[email protected]>

---



diff -puN kernel/rcupreempt.c~rcu-fix-trivials kernel/rcupreempt.c
--- linux-2.6.20-rc3-mm1-rcu/kernel/rcupreempt.c~rcu-fix-trivials 2007-01-15 15:37:00.000000000 +0530
+++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcupreempt.c 2007-01-15 15:37:00.000000000 +0530
@@ -156,7 +156,7 @@ void __rcu_read_lock(void)
local_irq_save(oldirq);

/*
- * Outermost nesting of rcu_read_lock(), so atomically
+ * Outermost nesting of rcu_read_lock(), so
* increment the current counter for the current CPU.
*/
idx = rcu_ctrlblk.completed & 0x1;
@@ -169,7 +169,7 @@ void __rcu_read_lock(void)
* Now that the per-CPU counter has been incremented, we
* are protected. We can therefore safely increment
* the nesting counter, relieving further NMIs of the
- * need to do so.
+ * need to increment the per-CPU counter.
*/
current->rcu_read_lock_nesting = nesting + 1;
barrier();

_

2007-01-16 17:47:31

by Paul E. McKenney

[permalink] [raw]
Subject: Re: [mm PATCH 1/6] RCU: split classic rcu

On Tue, Jan 16, 2007 at 12:51:32AM +0530, Dipankar Sarma wrote:
>
>
>
> This patch re-organizes the RCU code to enable multiple implementations
> of RCU. Users of RCU continues to include rcupdate.h and the
> RCU interfaces remain the same. This is in preparation for
> subsequently merging the preepmtpible RCU implementation.

Acked-by: Paul E. McKenney <[email protected]>

> Signed-off-by: Dipankar Sarma <[email protected]>
> ---
>
>
>
>
>
> diff -puN /dev/null include/linux/rcuclassic.h
> --- /dev/null 2006-03-26 18:34:52.000000000 +0530
> +++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/rcuclassic.h 2007-01-15 15:35:05.000000000 +0530
> @@ -0,0 +1,148 @@
> +/*
> + * Read-Copy Update mechanism for mutual exclusion (classic version)
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
> + *
> + * Copyright IBM Corporation, 2001
> + *
> + * Author: Dipankar Sarma <[email protected]>
> + *
> + * Based on the original work by Paul McKenney <[email protected]>
> + * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
> + * Papers:
> + * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
> + * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
> + *
> + * For detailed explanation of Read-Copy Update mechanism see -
> + * http://lse.sourceforge.net/locking/rcupdate.html
> + *
> + */
> +
> +#ifndef __LINUX_RCUCLASSIC_H
> +#define __LINUX_RCUCLASSIC_H
> +
> +#ifdef __KERNEL__
> +
> +#include <linux/cache.h>
> +#include <linux/spinlock.h>
> +#include <linux/threads.h>
> +#include <linux/percpu.h>
> +#include <linux/cpumask.h>
> +#include <linux/seqlock.h>
> +
> +
> +/* Global control variables for rcupdate callback mechanism. */
> +struct rcu_ctrlblk {
> + long cur; /* Current batch number. */
> + long completed; /* Number of the last completed batch */
> + int next_pending; /* Is the next batch already waiting? */
> +
> + int signaled;
> +
> + spinlock_t lock ____cacheline_internodealigned_in_smp;
> + cpumask_t cpumask; /* CPUs that need to switch in order */
> + /* for current batch to proceed. */
> +} ____cacheline_internodealigned_in_smp;
> +
> +/* Is batch a before batch b ? */
> +static inline int rcu_batch_before(long a, long b)
> +{
> + return (a - b) < 0;
> +}
> +
> +/* Is batch a after batch b ? */
> +static inline int rcu_batch_after(long a, long b)
> +{
> + return (a - b) > 0;
> +}
> +
> +/*
> + * Per-CPU data for Read-Copy UPdate.
> + * nxtlist - new callbacks are added here
> + * curlist - current batch for which quiescent cycle started if any
> + */
> +struct rcu_data {
> + /* 1) quiescent state handling : */
> + long quiescbatch; /* Batch # for grace period */
> + int passed_quiesc; /* User-mode/idle loop etc. */
> + int qs_pending; /* core waits for quiesc state */
> +
> + /* 2) batch handling */
> + long batch; /* Batch # for current RCU batch */
> + struct rcu_head *nxtlist;
> + struct rcu_head **nxttail;
> + long qlen; /* # of queued callbacks */
> + struct rcu_head *curlist;
> + struct rcu_head **curtail;
> + struct rcu_head *donelist;
> + struct rcu_head **donetail;
> + long blimit; /* Upper limit on a processed batch */
> + int cpu;
> +};
> +
> +DECLARE_PER_CPU(struct rcu_data, rcu_data);
> +DECLARE_PER_CPU(struct rcu_data, rcu_bh_data);
> +
> +/*
> + * Increment the quiescent state counter.
> + * The counter is a bit degenerated: We do not need to know
> + * how many quiescent states passed, just if there was at least
> + * one since the start of the grace period. Thus just a flag.
> + */
> +static inline void rcu_qsctr_inc(int cpu)
> +{
> + struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> + rdp->passed_quiesc = 1;
> +}
> +static inline void rcu_bh_qsctr_inc(int cpu)
> +{
> + struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
> + rdp->passed_quiesc = 1;
> +}
> +
> +extern int rcu_pending(int cpu);
> +extern int rcu_needs_cpu(int cpu);
> +
> +#define __rcu_read_lock() \
> + do { \
> + preempt_disable(); \
> + __acquire(RCU); \
> + } while(0)
> +#define __rcu_read_unlock() \
> + do { \
> + __release(RCU); \
> + preempt_enable(); \
> + } while(0)
> +
> +#define __rcu_read_lock_bh() \
> + do { \
> + local_bh_disable(); \
> + __acquire(RCU_BH); \
> + } while(0)
> +#define __rcu_read_unlock_bh() \
> + do { \
> + __release(RCU_BH); \
> + local_bh_enable(); \
> + } while(0)
> +
> +#define __synchronize_sched() synchronize_rcu()
> +
> +extern void __rcu_init(void);
> +extern void rcu_check_callbacks(int cpu, int user);
> +extern void rcu_restart_cpu(int cpu);
> +extern long rcu_batches_completed(void);
> +
> +#endif /* __KERNEL__ */
> +#endif /* __LINUX_RCUCLASSIC_H */
> diff -puN include/linux/rcupdate.h~rcu-split-classic include/linux/rcupdate.h
> --- linux-2.6.20-rc3-mm1-rcu/include/linux/rcupdate.h~rcu-split-classic 2007-01-14 23:04:09.000000000 +0530
> +++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/rcupdate.h 2007-01-15 15:36:34.000000000 +0530
> @@ -15,7 +15,7 @@
> * along with this program; if not, write to the Free Software
> * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
> *
> - * Copyright (C) IBM Corporation, 2001
> + * Copyright IBM Corporation, 2001
> *
> * Author: Dipankar Sarma <[email protected]>
> *
> @@ -41,6 +41,7 @@
> #include <linux/percpu.h>
> #include <linux/cpumask.h>
> #include <linux/seqlock.h>
> +#include <linux/rcuclassic.h>
>
> /**
> * struct rcu_head - callback structure for use with RCU
> @@ -58,81 +59,6 @@ struct rcu_head {
> (ptr)->next = NULL; (ptr)->func = NULL; \
> } while (0)
>
> -
> -
> -/* Global control variables for rcupdate callback mechanism. */
> -struct rcu_ctrlblk {
> - long cur; /* Current batch number. */
> - long completed; /* Number of the last completed batch */
> - int next_pending; /* Is the next batch already waiting? */
> -
> - int signaled;
> -
> - spinlock_t lock ____cacheline_internodealigned_in_smp;
> - cpumask_t cpumask; /* CPUs that need to switch in order */
> - /* for current batch to proceed. */
> -} ____cacheline_internodealigned_in_smp;
> -
> -/* Is batch a before batch b ? */
> -static inline int rcu_batch_before(long a, long b)
> -{
> - return (a - b) < 0;
> -}
> -
> -/* Is batch a after batch b ? */
> -static inline int rcu_batch_after(long a, long b)
> -{
> - return (a - b) > 0;
> -}
> -
> -/*
> - * Per-CPU data for Read-Copy UPdate.
> - * nxtlist - new callbacks are added here
> - * curlist - current batch for which quiescent cycle started if any
> - */
> -struct rcu_data {
> - /* 1) quiescent state handling : */
> - long quiescbatch; /* Batch # for grace period */
> - int passed_quiesc; /* User-mode/idle loop etc. */
> - int qs_pending; /* core waits for quiesc state */
> -
> - /* 2) batch handling */
> - long batch; /* Batch # for current RCU batch */
> - struct rcu_head *nxtlist;
> - struct rcu_head **nxttail;
> - long qlen; /* # of queued callbacks */
> - struct rcu_head *curlist;
> - struct rcu_head **curtail;
> - struct rcu_head *donelist;
> - struct rcu_head **donetail;
> - long blimit; /* Upper limit on a processed batch */
> - int cpu;
> - struct rcu_head barrier;
> -};
> -
> -DECLARE_PER_CPU(struct rcu_data, rcu_data);
> -DECLARE_PER_CPU(struct rcu_data, rcu_bh_data);
> -
> -/*
> - * Increment the quiescent state counter.
> - * The counter is a bit degenerated: We do not need to know
> - * how many quiescent states passed, just if there was at least
> - * one since the start of the grace period. Thus just a flag.
> - */
> -static inline void rcu_qsctr_inc(int cpu)
> -{
> - struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> - rdp->passed_quiesc = 1;
> -}
> -static inline void rcu_bh_qsctr_inc(int cpu)
> -{
> - struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
> - rdp->passed_quiesc = 1;
> -}
> -
> -extern int rcu_pending(int cpu);
> -extern int rcu_needs_cpu(int cpu);
> -
> /**
> * rcu_read_lock - mark the beginning of an RCU read-side critical section.
> *
> @@ -162,22 +88,14 @@ extern int rcu_needs_cpu(int cpu);
> *
> * It is illegal to block while in an RCU read-side critical section.
> */
> -#define rcu_read_lock() \
> - do { \
> - preempt_disable(); \
> - __acquire(RCU); \
> - } while(0)
> +#define rcu_read_lock() __rcu_read_lock()
>
> /**
> * rcu_read_unlock - marks the end of an RCU read-side critical section.
> *
> * See rcu_read_lock() for more information.
> */
> -#define rcu_read_unlock() \
> - do { \
> - __release(RCU); \
> - preempt_enable(); \
> - } while(0)
> +#define rcu_read_unlock() __rcu_read_unlock()
>
> /*
> * So where is rcu_write_lock()? It does not exist, as there is no
> @@ -200,23 +118,15 @@ extern int rcu_needs_cpu(int cpu);
> * can use just rcu_read_lock().
> *
> */
> -#define rcu_read_lock_bh() \
> - do { \
> - local_bh_disable(); \
> - __acquire(RCU_BH); \
> - } while(0)
> -
> -/*
> +#define rcu_read_lock_bh() __rcu_read_lock_bh()
> +
> +/**
> * rcu_read_unlock_bh - marks the end of a softirq-only RCU critical section
> *
> * See rcu_read_lock_bh() for more information.
> */
> -#define rcu_read_unlock_bh() \
> - do { \
> - __release(RCU_BH); \
> - local_bh_enable(); \
> - } while(0)
> -
> +#define rcu_read_unlock_bh() __rcu_read_unlock_bh()
> +
> /**
> * rcu_dereference - fetch an RCU-protected pointer in an
> * RCU read-side critical section. This pointer may later
> @@ -267,22 +177,49 @@ extern int rcu_needs_cpu(int cpu);
> * In "classic RCU", these two guarantees happen to be one and
> * the same, but can differ in realtime RCU implementations.
> */
> -#define synchronize_sched() synchronize_rcu()
> +#define synchronize_sched() __synchronize_sched()
> +
> +/**
> + * call_rcu - Queue an RCU callback for invocation after a grace period.
> + * @head: structure to be used for queueing the RCU updates.
> + * @func: actual update function to be invoked after the grace period
> + *
> + * The update function will be invoked some time after a full grace
> + * period elapses, in other words after all currently executing RCU
> + * read-side critical sections have completed. RCU read-side critical
> + * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
> + * and may be nested.
> + */
> +extern void FASTCALL(call_rcu(struct rcu_head *head,
> + void (*func)(struct rcu_head *head)));
>
> -extern void rcu_init(void);
> -extern void rcu_check_callbacks(int cpu, int user);
> -extern void rcu_restart_cpu(int cpu);
> -extern long rcu_batches_completed(void);
> -extern long rcu_batches_completed_bh(void);
>
> -/* Exported interfaces */
> -extern void FASTCALL(call_rcu(struct rcu_head *head,
> - void (*func)(struct rcu_head *head)));
> +/**
> + * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
> + * @head: structure to be used for queueing the RCU updates.
> + * @func: actual update function to be invoked after the grace period
> + *
> + * The update function will be invoked some time after a full grace
> + * period elapses, in other words after all currently executing RCU
> + * read-side critical sections have completed. call_rcu_bh() assumes
> + * that the read-side critical sections end on completion of a softirq
> + * handler. This means that read-side critical sections in process
> + * context must not be interrupted by softirqs. This interface is to be
> + * used when most of the read-side critical sections are in softirq context.
> + * RCU read-side critical sections are delimited by rcu_read_lock() and
> + * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
> + * and rcu_read_unlock_bh(), if in process context. These may be nested.
> + */
> extern void FASTCALL(call_rcu_bh(struct rcu_head *head,
> void (*func)(struct rcu_head *head)));
> +
> +/* Exported common interfaces */
> extern void synchronize_rcu(void);
> -void synchronize_idle(void);
> extern void rcu_barrier(void);
> +
> +/* Internal to kernel */
> +extern void rcu_init(void);
> +extern void rcu_check_callbacks(int cpu, int user);
>
> #endif /* __KERNEL__ */
> #endif /* __LINUX_RCUPDATE_H */
> diff -puN kernel/Makefile~rcu-split-classic kernel/Makefile
> --- linux-2.6.20-rc3-mm1-rcu/kernel/Makefile~rcu-split-classic 2007-01-14 23:04:09.000000000 +0530
> +++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/Makefile 2007-01-15 15:34:21.000000000 +0530
> @@ -6,7 +6,7 @@ obj-y = sched.o fork.o exec_domain.o
> exit.o itimer.o time.o softirq.o resource.o \
> sysctl.o capability.o ptrace.o timer.o user.o user_namespace.o \
> signal.o sys.o kmod.o workqueue.o pid.o \
> - rcupdate.o extable.o params.o posix-timers.o \
> + rcupdate.o rcuclassic.o extable.o params.o posix-timers.o \
> kthread.o wait.o kfifo.o sys_ni.o posix-cpu-timers.o mutex.o \
> hrtimer.o rwsem.o latency.o nsproxy.o srcu.o
>
> diff -puN /dev/null kernel/rcuclassic.c
> --- /dev/null 2006-03-26 18:34:52.000000000 +0530
> +++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcuclassic.c 2007-01-15 15:34:47.000000000 +0530
> @@ -0,0 +1,558 @@
> +/*
> + * Read-Copy Update mechanism for mutual exclusion, classic implementation
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
> + *
> + * Copyright IBM Corporation, 2001
> + *
> + * Authors: Dipankar Sarma <[email protected]>
> + * Manfred Spraul <[email protected]>
> + *
> + * Based on the original work by Paul McKenney <[email protected]>
> + * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
> + *
> + * Papers: http://www.rdrop.com/users/paulmck/RCU
> + *
> + * For detailed explanation of Read-Copy Update mechanism see -
> + * Documentation/RCU/ *.txt
> + *
> + */
> +#include <linux/types.h>
> +#include <linux/kernel.h>
> +#include <linux/init.h>
> +#include <linux/spinlock.h>
> +#include <linux/smp.h>
> +#include <linux/rcupdate.h>
> +#include <linux/interrupt.h>
> +#include <linux/sched.h>
> +#include <asm/atomic.h>
> +#include <linux/bitops.h>
> +#include <linux/module.h>
> +#include <linux/completion.h>
> +#include <linux/moduleparam.h>
> +#include <linux/percpu.h>
> +#include <linux/notifier.h>
> +#include <linux/rcupdate.h>
> +#include <linux/cpu.h>
> +#include <linux/random.h>
> +#include <linux/delay.h>
> +#include <linux/byteorder/swabb.h>
> +
> +
> +/* Definition for rcupdate control block. */
> +static struct rcu_ctrlblk rcu_ctrlblk = {
> + .cur = -300,
> + .completed = -300,
> + .lock = __SPIN_LOCK_UNLOCKED(&rcu_ctrlblk.lock),
> + .cpumask = CPU_MASK_NONE,
> +};
> +static struct rcu_ctrlblk rcu_bh_ctrlblk = {
> + .cur = -300,
> + .completed = -300,
> + .lock = __SPIN_LOCK_UNLOCKED(&rcu_bh_ctrlblk.lock),
> + .cpumask = CPU_MASK_NONE,
> +};
> +
> +DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
> +DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
> +
> +/* Fake initialization required by compiler */
> +static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
> +static int blimit = 10;
> +static int qhimark = 10000;
> +static int qlowmark = 100;
> +
> +#ifdef CONFIG_SMP
> +static void force_quiescent_state(struct rcu_data *rdp,
> + struct rcu_ctrlblk *rcp)
> +{
> + int cpu;
> + cpumask_t cpumask;
> + set_need_resched();
> + if (unlikely(!rcp->signaled)) {
> + rcp->signaled = 1;
> + /*
> + * Don't send IPI to itself. With irqs disabled,
> + * rdp->cpu is the current cpu.
> + */
> + cpumask = rcp->cpumask;
> + cpu_clear(rdp->cpu, cpumask);
> + for_each_cpu_mask(cpu, cpumask)
> + smp_send_reschedule(cpu);
> + }
> +}
> +#else
> +static inline void force_quiescent_state(struct rcu_data *rdp,
> + struct rcu_ctrlblk *rcp)
> +{
> + set_need_resched();
> +}
> +#endif
> +
> +/**
> + * call_rcu - Queue an RCU callback for invocation after a grace period.
> + * @head: structure to be used for queueing the RCU updates.
> + * @func: actual update function to be invoked after the grace period
> + *
> + * The update function will be invoked some time after a full grace
> + * period elapses, in other words after all currently executing RCU
> + * read-side critical sections have completed. RCU read-side critical
> + * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
> + * and may be nested.
> + */
> +void fastcall call_rcu(struct rcu_head *head,
> + void (*func)(struct rcu_head *rcu))
> +{
> + unsigned long flags;
> + struct rcu_data *rdp;
> +
> + head->func = func;
> + head->next = NULL;
> + local_irq_save(flags);
> + rdp = &__get_cpu_var(rcu_data);
> + *rdp->nxttail = head;
> + rdp->nxttail = &head->next;
> + if (unlikely(++rdp->qlen > qhimark)) {
> + rdp->blimit = INT_MAX;
> + force_quiescent_state(rdp, &rcu_ctrlblk);
> + }
> + local_irq_restore(flags);
> +}
> +
> +/**
> + * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
> + * @head: structure to be used for queueing the RCU updates.
> + * @func: actual update function to be invoked after the grace period
> + *
> + * The update function will be invoked some time after a full grace
> + * period elapses, in other words after all currently executing RCU
> + * read-side critical sections have completed. call_rcu_bh() assumes
> + * that the read-side critical sections end on completion of a softirq
> + * handler. This means that read-side critical sections in process
> + * context must not be interrupted by softirqs. This interface is to be
> + * used when most of the read-side critical sections are in softirq context.
> + * RCU read-side critical sections are delimited by rcu_read_lock() and
> + * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
> + * and rcu_read_unlock_bh(), if in process context. These may be nested.
> + */
> +void fastcall call_rcu_bh(struct rcu_head *head,
> + void (*func)(struct rcu_head *rcu))
> +{
> + unsigned long flags;
> + struct rcu_data *rdp;
> +
> + head->func = func;
> + head->next = NULL;
> + local_irq_save(flags);
> + rdp = &__get_cpu_var(rcu_bh_data);
> + *rdp->nxttail = head;
> + rdp->nxttail = &head->next;
> +
> + if (unlikely(++rdp->qlen > qhimark)) {
> + rdp->blimit = INT_MAX;
> + force_quiescent_state(rdp, &rcu_bh_ctrlblk);
> + }
> +
> + local_irq_restore(flags);
> +}
> +
> +/*
> + * Return the number of RCU batches processed thus far. Useful
> + * for debug and statistics.
> + */
> +long rcu_batches_completed(void)
> +{
> + return rcu_ctrlblk.completed;
> +}
> +
> +/*
> + * Return the number of RCU batches processed thus far. Useful
> + * for debug and statistics.
> + */
> +long rcu_batches_completed_bh(void)
> +{
> + return rcu_bh_ctrlblk.completed;
> +}
> +
> +/*
> + * Invoke the completed RCU callbacks. They are expected to be in
> + * a per-cpu list.
> + */
> +static void rcu_do_batch(struct rcu_data *rdp)
> +{
> + struct rcu_head *next, *list;
> + int count = 0;
> +
> + list = rdp->donelist;
> + while (list) {
> + next = list->next;
> + prefetch(next);
> + list->func(list);
> + list = next;
> + if (++count >= rdp->blimit)
> + break;
> + }
> + rdp->donelist = list;
> +
> + local_irq_disable();
> + rdp->qlen -= count;
> + local_irq_enable();
> + if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
> + rdp->blimit = blimit;
> +
> + if (!rdp->donelist)
> + rdp->donetail = &rdp->donelist;
> + else
> + tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
> +}
> +
> +/*
> + * Grace period handling:
> + * The grace period handling consists out of two steps:
> + * - A new grace period is started.
> + * This is done by rcu_start_batch. The start is not broadcasted to
> + * all cpus, they must pick this up by comparing rcp->cur with
> + * rdp->quiescbatch. All cpus are recorded in the
> + * rcu_ctrlblk.cpumask bitmap.
> + * - All cpus must go through a quiescent state.
> + * Since the start of the grace period is not broadcasted, at least two
> + * calls to rcu_check_quiescent_state are required:
> + * The first call just notices that a new grace period is running. The
> + * following calls check if there was a quiescent state since the beginning
> + * of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
> + * the bitmap is empty, then the grace period is completed.
> + * rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
> + * period (if necessary).
> + */
> +/*
> + * Register a new batch of callbacks, and start it up if there is currently no
> + * active batch and the batch to be registered has not already occurred.
> + * Caller must hold rcu_ctrlblk.lock.
> + */
> +static void rcu_start_batch(struct rcu_ctrlblk *rcp)
> +{
> + if (rcp->next_pending &&
> + rcp->completed == rcp->cur) {
> + rcp->next_pending = 0;
> + /*
> + * next_pending == 0 must be visible in
> + * __rcu_process_callbacks() before it can see new value of cur.
> + */
> + smp_wmb();
> + rcp->cur++;
> +
> + /*
> + * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
> + * Barrier Otherwise it can cause tickless idle CPUs to be
> + * included in rcp->cpumask, which will extend graceperiods
> + * unnecessarily.
> + */
> + smp_mb();
> + cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
> +
> + rcp->signaled = 0;
> + }
> +}
> +
> +/*
> + * cpu went through a quiescent state since the beginning of the grace period.
> + * Clear it from the cpu mask and complete the grace period if it was the last
> + * cpu. Start another grace period if someone has further entries pending
> + */
> +static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
> +{
> + cpu_clear(cpu, rcp->cpumask);
> + if (cpus_empty(rcp->cpumask)) {
> + /* batch completed ! */
> + rcp->completed = rcp->cur;
> + rcu_start_batch(rcp);
> + }
> +}
> +
> +/*
> + * Check if the cpu has gone through a quiescent state (say context
> + * switch). If so and if it already hasn't done so in this RCU
> + * quiescent cycle, then indicate that it has done so.
> + */
> +static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
> + struct rcu_data *rdp)
> +{
> + if (rdp->quiescbatch != rcp->cur) {
> + /* start new grace period: */
> + rdp->qs_pending = 1;
> + rdp->passed_quiesc = 0;
> + rdp->quiescbatch = rcp->cur;
> + return;
> + }
> +
> + /* Grace period already completed for this cpu?
> + * qs_pending is checked instead of the actual bitmap to avoid
> + * cacheline trashing.
> + */
> + if (!rdp->qs_pending)
> + return;
> +
> + /*
> + * Was there a quiescent state since the beginning of the grace
> + * period? If no, then exit and wait for the next call.
> + */
> + if (!rdp->passed_quiesc)
> + return;
> + rdp->qs_pending = 0;
> +
> + spin_lock(&rcp->lock);
> + /*
> + * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
> + * during cpu startup. Ignore the quiescent state.
> + */
> + if (likely(rdp->quiescbatch == rcp->cur))
> + cpu_quiet(rdp->cpu, rcp);
> +
> + spin_unlock(&rcp->lock);
> +}
> +
> +
> +#ifdef CONFIG_HOTPLUG_CPU
> +
> +/* warning! helper for rcu_offline_cpu. do not use elsewhere without reviewing
> + * locking requirements, the list it's pulling from has to belong to a cpu
> + * which is dead and hence not processing interrupts.
> + */
> +static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
> + struct rcu_head **tail)
> +{
> + local_irq_disable();
> + *this_rdp->nxttail = list;
> + if (list)
> + this_rdp->nxttail = tail;
> + local_irq_enable();
> +}
> +
> +static void __rcu_offline_cpu(struct rcu_data *this_rdp,
> + struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> +{
> + /* if the cpu going offline owns the grace period
> + * we can block indefinitely waiting for it, so flush
> + * it here
> + */
> + spin_lock_bh(&rcp->lock);
> + if (rcp->cur != rcp->completed)
> + cpu_quiet(rdp->cpu, rcp);
> + spin_unlock_bh(&rcp->lock);
> + rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
> + rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
> + rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
> +}
> +
> +static void rcu_offline_cpu(int cpu)
> +{
> + struct rcu_data *this_rdp = &get_cpu_var(rcu_data);
> + struct rcu_data *this_bh_rdp = &get_cpu_var(rcu_bh_data);
> +
> + __rcu_offline_cpu(this_rdp, &rcu_ctrlblk,
> + &per_cpu(rcu_data, cpu));
> + __rcu_offline_cpu(this_bh_rdp, &rcu_bh_ctrlblk,
> + &per_cpu(rcu_bh_data, cpu));
> + put_cpu_var(rcu_data);
> + put_cpu_var(rcu_bh_data);
> + tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
> +}
> +
> +#else
> +
> +static void rcu_offline_cpu(int cpu)
> +{
> +}
> +
> +#endif
> +
> +/*
> + * This does the RCU processing work from tasklet context.
> + */
> +static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
> + struct rcu_data *rdp)
> +{
> + if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
> + *rdp->donetail = rdp->curlist;
> + rdp->donetail = rdp->curtail;
> + rdp->curlist = NULL;
> + rdp->curtail = &rdp->curlist;
> + }
> +
> + if (rdp->nxtlist && !rdp->curlist) {
> + local_irq_disable();
> + rdp->curlist = rdp->nxtlist;
> + rdp->curtail = rdp->nxttail;
> + rdp->nxtlist = NULL;
> + rdp->nxttail = &rdp->nxtlist;
> + local_irq_enable();
> +
> + /*
> + * start the next batch of callbacks
> + */
> +
> + /* determine batch number */
> + rdp->batch = rcp->cur + 1;
> + /* see the comment and corresponding wmb() in
> + * the rcu_start_batch()
> + */
> + smp_rmb();
> +
> + if (!rcp->next_pending) {
> + /* and start it/schedule start if it's a new batch */
> + spin_lock(&rcp->lock);
> + rcp->next_pending = 1;
> + rcu_start_batch(rcp);
> + spin_unlock(&rcp->lock);
> + }
> + }
> +
> + rcu_check_quiescent_state(rcp, rdp);
> + if (rdp->donelist)
> + rcu_do_batch(rdp);
> +}
> +
> +static void rcu_process_callbacks(unsigned long unused)
> +{
> + __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
> + __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
> +}
> +
> +static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> +{
> + /* This cpu has pending rcu entries and the grace period
> + * for them has completed.
> + */
> + if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
> + return 1;
> +
> + /* This cpu has no pending entries, but there are new entries */
> + if (!rdp->curlist && rdp->nxtlist)
> + return 1;
> +
> + /* This cpu has finished callbacks to invoke */
> + if (rdp->donelist)
> + return 1;
> +
> + /* The rcu core waits for a quiescent state from the cpu */
> + if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
> + return 1;
> +
> + /* nothing to do */
> + return 0;
> +}
> +
> +/*
> + * Check to see if there is any immediate RCU-related work to be done
> + * by the current CPU, returning 1 if so. This function is part of the
> + * RCU implementation; it is -not- an exported member of the RCU API.
> + */
> +int rcu_pending(int cpu)
> +{
> + return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
> + __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
> +}
> +
> +/*
> + * Check to see if any future RCU-related work will need to be done
> + * by the current CPU, even if none need be done immediately, returning
> + * 1 if so. This function is part of the RCU implementation; it is -not-
> + * an exported member of the RCU API.
> + */
> +int rcu_needs_cpu(int cpu)
> +{
> + struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> + struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu);
> +
> + return (!!rdp->curlist || !!rdp_bh->curlist || rcu_pending(cpu));
> +}
> +
> +void rcu_check_callbacks(int cpu, int user)
> +{
> + if (user ||
> + (idle_cpu(cpu) && !in_softirq() &&
> + hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
> + rcu_qsctr_inc(cpu);
> + rcu_bh_qsctr_inc(cpu);
> + } else if (!in_softirq())
> + rcu_bh_qsctr_inc(cpu);
> + tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
> +}
> +
> +static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
> + struct rcu_data *rdp)
> +{
> + memset(rdp, 0, sizeof(*rdp));
> + rdp->curtail = &rdp->curlist;
> + rdp->nxttail = &rdp->nxtlist;
> + rdp->donetail = &rdp->donelist;
> + rdp->quiescbatch = rcp->completed;
> + rdp->qs_pending = 0;
> + rdp->cpu = cpu;
> + rdp->blimit = blimit;
> +}
> +
> +static void __devinit rcu_online_cpu(int cpu)
> +{
> + struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> + struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
> +
> + rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
> + rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
> + tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
> +}
> +
> +static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
> + unsigned long action, void *hcpu)
> +{
> + long cpu = (long)hcpu;
> + switch (action) {
> + case CPU_UP_PREPARE:
> + rcu_online_cpu(cpu);
> + break;
> + case CPU_DEAD:
> + rcu_offline_cpu(cpu);
> + break;
> + default:
> + break;
> + }
> + return NOTIFY_OK;
> +}
> +
> +static struct notifier_block __cpuinitdata rcu_nb = {
> + .notifier_call = rcu_cpu_notify,
> +};
> +
> +/*
> + * Initializes rcu mechanism. Assumed to be called early.
> + * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
> + * Note that rcu_qsctr and friends are implicitly
> + * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
> + */
> +void __init __rcu_init(void)
> +{
> + rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
> + (void *)(long)smp_processor_id());
> + /* Register notifier for non-boot CPUs */
> + register_cpu_notifier(&rcu_nb);
> +}
> +
> +module_param(blimit, int, 0);
> +module_param(qhimark, int, 0);
> +module_param(qlowmark, int, 0);
> +EXPORT_SYMBOL_GPL(rcu_batches_completed);
> +EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
> +EXPORT_SYMBOL_GPL(call_rcu);
> +EXPORT_SYMBOL_GPL(call_rcu_bh);
> diff -puN kernel/rcupdate.c~rcu-split-classic kernel/rcupdate.c
> --- linux-2.6.20-rc3-mm1-rcu/kernel/rcupdate.c~rcu-split-classic 2007-01-14 23:04:09.000000000 +0530
> +++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcupdate.c 2007-01-15 15:36:09.000000000 +0530
> @@ -15,7 +15,7 @@
> * along with this program; if not, write to the Free Software
> * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
> *
> - * Copyright (C) IBM Corporation, 2001
> + * Copyright IBM Corporation, 2001
> *
> * Authors: Dipankar Sarma <[email protected]>
> * Manfred Spraul <[email protected]>
> @@ -35,157 +35,58 @@
> #include <linux/init.h>
> #include <linux/spinlock.h>
> #include <linux/smp.h>
> -#include <linux/rcupdate.h>
> #include <linux/interrupt.h>
> #include <linux/sched.h>
> #include <asm/atomic.h>
> #include <linux/bitops.h>
> -#include <linux/module.h>
> #include <linux/completion.h>
> -#include <linux/moduleparam.h>
> #include <linux/percpu.h>
> -#include <linux/notifier.h>
> #include <linux/rcupdate.h>
> #include <linux/cpu.h>
> #include <linux/mutex.h>
> +#include <linux/module.h>
>
> -/* Definition for rcupdate control block. */
> -static struct rcu_ctrlblk rcu_ctrlblk = {
> - .cur = -300,
> - .completed = -300,
> - .lock = __SPIN_LOCK_UNLOCKED(&rcu_ctrlblk.lock),
> - .cpumask = CPU_MASK_NONE,
> -};
> -static struct rcu_ctrlblk rcu_bh_ctrlblk = {
> - .cur = -300,
> - .completed = -300,
> - .lock = __SPIN_LOCK_UNLOCKED(&rcu_bh_ctrlblk.lock),
> - .cpumask = CPU_MASK_NONE,
> +struct rcu_synchronize {
> + struct rcu_head head;
> + struct completion completion;
> };
>
> -DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
> -DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
> -
> -/* Fake initialization required by compiler */
> -static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
> -static int blimit = 10;
> -static int qhimark = 10000;
> -static int qlowmark = 100;
> -
> +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head);
> static atomic_t rcu_barrier_cpu_count;
> static DEFINE_MUTEX(rcu_barrier_mutex);
> static struct completion rcu_barrier_completion;
>
> -#ifdef CONFIG_SMP
> -static void force_quiescent_state(struct rcu_data *rdp,
> - struct rcu_ctrlblk *rcp)
> -{
> - int cpu;
> - cpumask_t cpumask;
> - set_need_resched();
> - if (unlikely(!rcp->signaled)) {
> - rcp->signaled = 1;
> - /*
> - * Don't send IPI to itself. With irqs disabled,
> - * rdp->cpu is the current cpu.
> - */
> - cpumask = rcp->cpumask;
> - cpu_clear(rdp->cpu, cpumask);
> - for_each_cpu_mask(cpu, cpumask)
> - smp_send_reschedule(cpu);
> - }
> -}
> -#else
> -static inline void force_quiescent_state(struct rcu_data *rdp,
> - struct rcu_ctrlblk *rcp)
> +/* Because of FASTCALL declaration of complete, we use this wrapper */
> +static void wakeme_after_rcu(struct rcu_head *head)
> {
> - set_need_resched();
> + struct rcu_synchronize *rcu;
> +
> + rcu = container_of(head, struct rcu_synchronize, head);
> + complete(&rcu->completion);
> }
> -#endif
>
> /**
> - * call_rcu - Queue an RCU callback for invocation after a grace period.
> - * @head: structure to be used for queueing the RCU updates.
> - * @func: actual update function to be invoked after the grace period
> + * synchronize_rcu - wait until a grace period has elapsed.
> *
> - * The update function will be invoked some time after a full grace
> - * period elapses, in other words after all currently executing RCU
> + * Control will return to the caller some time after a full grace
> + * period has elapsed, in other words after all currently executing RCU
> * read-side critical sections have completed. RCU read-side critical
> * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
> * and may be nested.
> - */
> -void fastcall call_rcu(struct rcu_head *head,
> - void (*func)(struct rcu_head *rcu))
> -{
> - unsigned long flags;
> - struct rcu_data *rdp;
> -
> - head->func = func;
> - head->next = NULL;
> - local_irq_save(flags);
> - rdp = &__get_cpu_var(rcu_data);
> - *rdp->nxttail = head;
> - rdp->nxttail = &head->next;
> - if (unlikely(++rdp->qlen > qhimark)) {
> - rdp->blimit = INT_MAX;
> - force_quiescent_state(rdp, &rcu_ctrlblk);
> - }
> - local_irq_restore(flags);
> -}
> -
> -/**
> - * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
> - * @head: structure to be used for queueing the RCU updates.
> - * @func: actual update function to be invoked after the grace period
> *
> - * The update function will be invoked some time after a full grace
> - * period elapses, in other words after all currently executing RCU
> - * read-side critical sections have completed. call_rcu_bh() assumes
> - * that the read-side critical sections end on completion of a softirq
> - * handler. This means that read-side critical sections in process
> - * context must not be interrupted by softirqs. This interface is to be
> - * used when most of the read-side critical sections are in softirq context.
> - * RCU read-side critical sections are delimited by rcu_read_lock() and
> - * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
> - * and rcu_read_unlock_bh(), if in process context. These may be nested.
> + * If your read-side code is not protected by rcu_read_lock(), do -not-
> + * use synchronize_rcu().
> */
> -void fastcall call_rcu_bh(struct rcu_head *head,
> - void (*func)(struct rcu_head *rcu))
> +void synchronize_rcu(void)
> {
> - unsigned long flags;
> - struct rcu_data *rdp;
> -
> - head->func = func;
> - head->next = NULL;
> - local_irq_save(flags);
> - rdp = &__get_cpu_var(rcu_bh_data);
> - *rdp->nxttail = head;
> - rdp->nxttail = &head->next;
> -
> - if (unlikely(++rdp->qlen > qhimark)) {
> - rdp->blimit = INT_MAX;
> - force_quiescent_state(rdp, &rcu_bh_ctrlblk);
> - }
> -
> - local_irq_restore(flags);
> -}
> + struct rcu_synchronize rcu;
>
> -/*
> - * Return the number of RCU batches processed thus far. Useful
> - * for debug and statistics.
> - */
> -long rcu_batches_completed(void)
> -{
> - return rcu_ctrlblk.completed;
> -}
> + init_completion(&rcu.completion);
> + /* Will wake me after RCU finished */
> + call_rcu(&rcu.head, wakeme_after_rcu);
>
> -/*
> - * Return the number of RCU batches processed thus far. Useful
> - * for debug and statistics.
> - */
> -long rcu_batches_completed_bh(void)
> -{
> - return rcu_bh_ctrlblk.completed;
> + /* Wait for it */
> + wait_for_completion(&rcu.completion);
> }
>
> static void rcu_barrier_callback(struct rcu_head *notused)
> @@ -200,10 +101,8 @@ static void rcu_barrier_callback(struct
> static void rcu_barrier_func(void *notused)
> {
> int cpu = smp_processor_id();
> - struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> - struct rcu_head *head;
> + struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);
>
> - head = &rdp->barrier;
> atomic_inc(&rcu_barrier_cpu_count);
> call_rcu(head, rcu_barrier_callback);
> }
> @@ -222,414 +121,11 @@ void rcu_barrier(void)
> wait_for_completion(&rcu_barrier_completion);
> mutex_unlock(&rcu_barrier_mutex);
> }
> -EXPORT_SYMBOL_GPL(rcu_barrier);
> -
> -/*
> - * Invoke the completed RCU callbacks. They are expected to be in
> - * a per-cpu list.
> - */
> -static void rcu_do_batch(struct rcu_data *rdp)
> -{
> - struct rcu_head *next, *list;
> - int count = 0;
> -
> - list = rdp->donelist;
> - while (list) {
> - next = list->next;
> - prefetch(next);
> - list->func(list);
> - list = next;
> - if (++count >= rdp->blimit)
> - break;
> - }
> - rdp->donelist = list;
> -
> - local_irq_disable();
> - rdp->qlen -= count;
> - local_irq_enable();
> - if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
> - rdp->blimit = blimit;
> -
> - if (!rdp->donelist)
> - rdp->donetail = &rdp->donelist;
> - else
> - tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
> -}
> -
> -/*
> - * Grace period handling:
> - * The grace period handling consists out of two steps:
> - * - A new grace period is started.
> - * This is done by rcu_start_batch. The start is not broadcasted to
> - * all cpus, they must pick this up by comparing rcp->cur with
> - * rdp->quiescbatch. All cpus are recorded in the
> - * rcu_ctrlblk.cpumask bitmap.
> - * - All cpus must go through a quiescent state.
> - * Since the start of the grace period is not broadcasted, at least two
> - * calls to rcu_check_quiescent_state are required:
> - * The first call just notices that a new grace period is running. The
> - * following calls check if there was a quiescent state since the beginning
> - * of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
> - * the bitmap is empty, then the grace period is completed.
> - * rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
> - * period (if necessary).
> - */
> -/*
> - * Register a new batch of callbacks, and start it up if there is currently no
> - * active batch and the batch to be registered has not already occurred.
> - * Caller must hold rcu_ctrlblk.lock.
> - */
> -static void rcu_start_batch(struct rcu_ctrlblk *rcp)
> -{
> - if (rcp->next_pending &&
> - rcp->completed == rcp->cur) {
> - rcp->next_pending = 0;
> - /*
> - * next_pending == 0 must be visible in
> - * __rcu_process_callbacks() before it can see new value of cur.
> - */
> - smp_wmb();
> - rcp->cur++;
> -
> - /*
> - * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
> - * Barrier Otherwise it can cause tickless idle CPUs to be
> - * included in rcp->cpumask, which will extend graceperiods
> - * unnecessarily.
> - */
> - smp_mb();
> - cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
> -
> - rcp->signaled = 0;
> - }
> -}
> -
> -/*
> - * cpu went through a quiescent state since the beginning of the grace period.
> - * Clear it from the cpu mask and complete the grace period if it was the last
> - * cpu. Start another grace period if someone has further entries pending
> - */
> -static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
> -{
> - cpu_clear(cpu, rcp->cpumask);
> - if (cpus_empty(rcp->cpumask)) {
> - /* batch completed ! */
> - rcp->completed = rcp->cur;
> - rcu_start_batch(rcp);
> - }
> -}
> -
> -/*
> - * Check if the cpu has gone through a quiescent state (say context
> - * switch). If so and if it already hasn't done so in this RCU
> - * quiescent cycle, then indicate that it has done so.
> - */
> -static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
> - struct rcu_data *rdp)
> -{
> - if (rdp->quiescbatch != rcp->cur) {
> - /* start new grace period: */
> - rdp->qs_pending = 1;
> - rdp->passed_quiesc = 0;
> - rdp->quiescbatch = rcp->cur;
> - return;
> - }
> -
> - /* Grace period already completed for this cpu?
> - * qs_pending is checked instead of the actual bitmap to avoid
> - * cacheline trashing.
> - */
> - if (!rdp->qs_pending)
> - return;
> -
> - /*
> - * Was there a quiescent state since the beginning of the grace
> - * period? If no, then exit and wait for the next call.
> - */
> - if (!rdp->passed_quiesc)
> - return;
> - rdp->qs_pending = 0;
> -
> - spin_lock(&rcp->lock);
> - /*
> - * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
> - * during cpu startup. Ignore the quiescent state.
> - */
> - if (likely(rdp->quiescbatch == rcp->cur))
> - cpu_quiet(rdp->cpu, rcp);
> -
> - spin_unlock(&rcp->lock);
> -}
> -
> -
> -#ifdef CONFIG_HOTPLUG_CPU
> -
> -/* warning! helper for rcu_offline_cpu. do not use elsewhere without reviewing
> - * locking requirements, the list it's pulling from has to belong to a cpu
> - * which is dead and hence not processing interrupts.
> - */
> -static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
> - struct rcu_head **tail)
> -{
> - local_irq_disable();
> - *this_rdp->nxttail = list;
> - if (list)
> - this_rdp->nxttail = tail;
> - local_irq_enable();
> -}
> -
> -static void __rcu_offline_cpu(struct rcu_data *this_rdp,
> - struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> -{
> - /* if the cpu going offline owns the grace period
> - * we can block indefinitely waiting for it, so flush
> - * it here
> - */
> - spin_lock_bh(&rcp->lock);
> - if (rcp->cur != rcp->completed)
> - cpu_quiet(rdp->cpu, rcp);
> - spin_unlock_bh(&rcp->lock);
> - rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
> - rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
> - rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
> -}
> -
> -static void rcu_offline_cpu(int cpu)
> -{
> - struct rcu_data *this_rdp = &get_cpu_var(rcu_data);
> - struct rcu_data *this_bh_rdp = &get_cpu_var(rcu_bh_data);
> -
> - __rcu_offline_cpu(this_rdp, &rcu_ctrlblk,
> - &per_cpu(rcu_data, cpu));
> - __rcu_offline_cpu(this_bh_rdp, &rcu_bh_ctrlblk,
> - &per_cpu(rcu_bh_data, cpu));
> - put_cpu_var(rcu_data);
> - put_cpu_var(rcu_bh_data);
> - tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
> -}
> -
> -#else
>
> -static void rcu_offline_cpu(int cpu)
> -{
> -}
> -
> -#endif
> -
> -/*
> - * This does the RCU processing work from tasklet context.
> - */
> -static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
> - struct rcu_data *rdp)
> -{
> - if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
> - *rdp->donetail = rdp->curlist;
> - rdp->donetail = rdp->curtail;
> - rdp->curlist = NULL;
> - rdp->curtail = &rdp->curlist;
> - }
> -
> - if (rdp->nxtlist && !rdp->curlist) {
> - local_irq_disable();
> - rdp->curlist = rdp->nxtlist;
> - rdp->curtail = rdp->nxttail;
> - rdp->nxtlist = NULL;
> - rdp->nxttail = &rdp->nxtlist;
> - local_irq_enable();
> -
> - /*
> - * start the next batch of callbacks
> - */
> -
> - /* determine batch number */
> - rdp->batch = rcp->cur + 1;
> - /* see the comment and corresponding wmb() in
> - * the rcu_start_batch()
> - */
> - smp_rmb();
> -
> - if (!rcp->next_pending) {
> - /* and start it/schedule start if it's a new batch */
> - spin_lock(&rcp->lock);
> - rcp->next_pending = 1;
> - rcu_start_batch(rcp);
> - spin_unlock(&rcp->lock);
> - }
> - }
> -
> - rcu_check_quiescent_state(rcp, rdp);
> - if (rdp->donelist)
> - rcu_do_batch(rdp);
> -}
> -
> -static void rcu_process_callbacks(unsigned long unused)
> -{
> - __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
> - __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
> -}
> -
> -static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> -{
> - /* This cpu has pending rcu entries and the grace period
> - * for them has completed.
> - */
> - if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
> - return 1;
> -
> - /* This cpu has no pending entries, but there are new entries */
> - if (!rdp->curlist && rdp->nxtlist)
> - return 1;
> -
> - /* This cpu has finished callbacks to invoke */
> - if (rdp->donelist)
> - return 1;
> -
> - /* The rcu core waits for a quiescent state from the cpu */
> - if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
> - return 1;
> -
> - /* nothing to do */
> - return 0;
> -}
> -
> -/*
> - * Check to see if there is any immediate RCU-related work to be done
> - * by the current CPU, returning 1 if so. This function is part of the
> - * RCU implementation; it is -not- an exported member of the RCU API.
> - */
> -int rcu_pending(int cpu)
> -{
> - return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
> - __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
> -}
> -
> -/*
> - * Check to see if any future RCU-related work will need to be done
> - * by the current CPU, even if none need be done immediately, returning
> - * 1 if so. This function is part of the RCU implementation; it is -not-
> - * an exported member of the RCU API.
> - */
> -int rcu_needs_cpu(int cpu)
> -{
> - struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> - struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu);
> -
> - return (!!rdp->curlist || !!rdp_bh->curlist || rcu_pending(cpu));
> -}
> -
> -void rcu_check_callbacks(int cpu, int user)
> -{
> - if (user ||
> - (idle_cpu(cpu) && !in_softirq() &&
> - hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
> - rcu_qsctr_inc(cpu);
> - rcu_bh_qsctr_inc(cpu);
> - } else if (!in_softirq())
> - rcu_bh_qsctr_inc(cpu);
> - tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
> -}
> -
> -static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
> - struct rcu_data *rdp)
> -{
> - memset(rdp, 0, sizeof(*rdp));
> - rdp->curtail = &rdp->curlist;
> - rdp->nxttail = &rdp->nxtlist;
> - rdp->donetail = &rdp->donelist;
> - rdp->quiescbatch = rcp->completed;
> - rdp->qs_pending = 0;
> - rdp->cpu = cpu;
> - rdp->blimit = blimit;
> -}
> -
> -static void __devinit rcu_online_cpu(int cpu)
> -{
> - struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> - struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
> -
> - rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
> - rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
> - tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
> -}
> -
> -static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
> - unsigned long action, void *hcpu)
> -{
> - long cpu = (long)hcpu;
> - switch (action) {
> - case CPU_UP_PREPARE:
> - rcu_online_cpu(cpu);
> - break;
> - case CPU_DEAD:
> - rcu_offline_cpu(cpu);
> - break;
> - default:
> - break;
> - }
> - return NOTIFY_OK;
> -}
> -
> -static struct notifier_block __cpuinitdata rcu_nb = {
> - .notifier_call = rcu_cpu_notify,
> -};
> -
> -/*
> - * Initializes rcu mechanism. Assumed to be called early.
> - * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
> - * Note that rcu_qsctr and friends are implicitly
> - * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
> - */
> void __init rcu_init(void)
> {
> - rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
> - (void *)(long)smp_processor_id());
> - /* Register notifier for non-boot CPUs */
> - register_cpu_notifier(&rcu_nb);
> -}
> -
> -struct rcu_synchronize {
> - struct rcu_head head;
> - struct completion completion;
> -};
> -
> -/* Because of FASTCALL declaration of complete, we use this wrapper */
> -static void wakeme_after_rcu(struct rcu_head *head)
> -{
> - struct rcu_synchronize *rcu;
> -
> - rcu = container_of(head, struct rcu_synchronize, head);
> - complete(&rcu->completion);
> -}
> -
> -/**
> - * synchronize_rcu - wait until a grace period has elapsed.
> - *
> - * Control will return to the caller some time after a full grace
> - * period has elapsed, in other words after all currently executing RCU
> - * read-side critical sections have completed. RCU read-side critical
> - * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
> - * and may be nested.
> - *
> - * If your read-side code is not protected by rcu_read_lock(), do -not-
> - * use synchronize_rcu().
> - */
> -void synchronize_rcu(void)
> -{
> - struct rcu_synchronize rcu;
> -
> - init_completion(&rcu.completion);
> - /* Will wake me after RCU finished */
> - call_rcu(&rcu.head, wakeme_after_rcu);
> -
> - /* Wait for it */
> - wait_for_completion(&rcu.completion);
> + __rcu_init();
> }
> -
> -module_param(blimit, int, 0);
> -module_param(qhimark, int, 0);
> -module_param(qlowmark, int, 0);
> -EXPORT_SYMBOL_GPL(rcu_batches_completed);
> -EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
> -EXPORT_SYMBOL_GPL(call_rcu);
> -EXPORT_SYMBOL_GPL(call_rcu_bh);
> +
> +EXPORT_SYMBOL_GPL(rcu_barrier);
> EXPORT_SYMBOL_GPL(synchronize_rcu);
>
> _

2007-01-16 17:48:11

by Paul E. McKenney

[permalink] [raw]
Subject: Re: [mm PATCH 2/6] RCU: softirq for RCU

On Tue, Jan 16, 2007 at 12:52:48AM +0530, Dipankar Sarma wrote:
>
>
> Finally, RCU gets its own softirq. With it being used extensively,
> the per-cpu tasklet used earlier was just a softirq with overheads.
> This makes things more efficient.

Acked-by: Paul E. McKenney <[email protected]>

> Signed-off-by: Dipankar Sarma <[email protected]>
> ---
>
>
>
> diff -puN include/linux/interrupt.h~rcu-softirq include/linux/interrupt.h
> --- linux-2.6.20-rc3-mm1-rcu/include/linux/interrupt.h~rcu-softirq 2007-01-15 15:36:43.000000000 +0530
> +++ linux-2.6.20-rc3-mm1-rcu-dipankar/include/linux/interrupt.h 2007-01-15 15:36:43.000000000 +0530
> @@ -236,6 +236,7 @@ enum
> #ifdef CONFIG_HIGH_RES_TIMERS
> HRTIMER_SOFTIRQ,
> #endif
> + RCU_SOFTIRQ /* Preferable RCU should always be the last softirq */
> };
>
> /* softirq mask and active fields moved to irq_cpustat_t in
> diff -puN kernel/rcuclassic.c~rcu-softirq kernel/rcuclassic.c
> --- linux-2.6.20-rc3-mm1-rcu/kernel/rcuclassic.c~rcu-softirq 2007-01-15 15:36:43.000000000 +0530
> +++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcuclassic.c 2007-01-15 15:36:43.000000000 +0530
> @@ -69,7 +69,6 @@ DEFINE_PER_CPU(struct rcu_data, rcu_data
> DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
>
> /* Fake initialization required by compiler */
> -static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
> static int blimit = 10;
> static int qhimark = 10000;
> static int qlowmark = 100;
> @@ -215,7 +214,7 @@ static void rcu_do_batch(struct rcu_data
> if (!rdp->donelist)
> rdp->donetail = &rdp->donelist;
> else
> - tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
> + raise_softirq(RCU_SOFTIRQ);
> }
>
> /*
> @@ -367,7 +366,6 @@ static void rcu_offline_cpu(int cpu)
> &per_cpu(rcu_bh_data, cpu));
> put_cpu_var(rcu_data);
> put_cpu_var(rcu_bh_data);
> - tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
> }
>
> #else
> @@ -379,7 +377,7 @@ static void rcu_offline_cpu(int cpu)
> #endif
>
> /*
> - * This does the RCU processing work from tasklet context.
> + * This does the RCU processing work from softirq context.
> */
> static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
> struct rcu_data *rdp)
> @@ -424,7 +422,7 @@ static void __rcu_process_callbacks(stru
> rcu_do_batch(rdp);
> }
>
> -static void rcu_process_callbacks(unsigned long unused)
> +static void rcu_process_callbacks(struct softirq_action *unused)
> {
> __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
> __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
> @@ -488,7 +486,7 @@ void rcu_check_callbacks(int cpu, int us
> rcu_bh_qsctr_inc(cpu);
> } else if (!in_softirq())
> rcu_bh_qsctr_inc(cpu);
> - tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
> + raise_softirq(RCU_SOFTIRQ);
> }
>
> static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
> @@ -511,7 +509,7 @@ static void __devinit rcu_online_cpu(int
>
> rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
> rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
> - tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
> + open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
> }
>
> static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
>
> _

2007-01-16 17:52:07

by Paul E. McKenney

[permalink] [raw]
Subject: Re: [mm PATCH 3/6] RCU: Fix barriers

On Tue, Jan 16, 2007 at 12:54:46AM +0530, Dipankar Sarma wrote:
>
>
> Fix rcu_barrier() to work properly in preemptive kernel environment.
> Also, the ordering of callback must be preserved while moving
> callbacks to another CPU during CPU hotplug.

Acked-by: Paul E. McKenney <[email protected]>

> Signed-off-by: Dipankar Sarma <[email protected]>
> ---
>
>
>
> diff -puN kernel/rcuclassic.c~rcu-fix-barriers kernel/rcuclassic.c
> --- linux-2.6.20-rc3-mm1-rcu/kernel/rcuclassic.c~rcu-fix-barriers 2007-01-15 15:36:47.000000000 +0530
> +++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcuclassic.c 2007-01-15 15:36:47.000000000 +0530
> @@ -350,9 +350,9 @@ static void __rcu_offline_cpu(struct rcu
> if (rcp->cur != rcp->completed)
> cpu_quiet(rdp->cpu, rcp);
> spin_unlock_bh(&rcp->lock);
> + rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
> rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
> rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
> - rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
> }
>
> static void rcu_offline_cpu(int cpu)
> diff -puN kernel/rcupdate.c~rcu-fix-barriers kernel/rcupdate.c
> --- linux-2.6.20-rc3-mm1-rcu/kernel/rcupdate.c~rcu-fix-barriers 2007-01-15 15:36:47.000000000 +0530
> +++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcupdate.c 2007-01-15 15:36:47.000000000 +0530
> @@ -117,7 +117,18 @@ void rcu_barrier(void)
> mutex_lock(&rcu_barrier_mutex);
> init_completion(&rcu_barrier_completion);
> atomic_set(&rcu_barrier_cpu_count, 0);
> + /*
> + * The queueing of callbacks in all CPUs must be
> + * atomic with respect to RCU, otherwise one cpu may
> + * queue a callback, wait for a grace period, decrement
> + * barrier count and call complete(), while other CPUs
> + * haven't yet queued anything. So, we need to make sure
> + * that no grace period happens until all the callbacks
> + * are queued.
> + */
> + rcu_read_lock();
> on_each_cpu(rcu_barrier_func, NULL, 0, 1);
> + rcu_read_unlock();
> wait_for_completion(&rcu_barrier_completion);
> mutex_unlock(&rcu_barrier_mutex);
> }
>
> _

2007-01-16 17:54:58

by Paul E. McKenney

[permalink] [raw]
Subject: Re: [mm PATCH 6/6] RCU: trivial fixes

On Tue, Jan 16, 2007 at 01:01:03AM +0530, Dipankar Sarma wrote:
>
> Fix a few trivial things based on review comments.

Acked-by: Paul E. McKenney <[email protected]>

> Signed-off-by: Dipankar Sarma <[email protected]>
>
> ---
>
>
>
> diff -puN kernel/rcupreempt.c~rcu-fix-trivials kernel/rcupreempt.c
> --- linux-2.6.20-rc3-mm1-rcu/kernel/rcupreempt.c~rcu-fix-trivials 2007-01-15 15:37:00.000000000 +0530
> +++ linux-2.6.20-rc3-mm1-rcu-dipankar/kernel/rcupreempt.c 2007-01-15 15:37:00.000000000 +0530
> @@ -156,7 +156,7 @@ void __rcu_read_lock(void)
> local_irq_save(oldirq);
>
> /*
> - * Outermost nesting of rcu_read_lock(), so atomically
> + * Outermost nesting of rcu_read_lock(), so
> * increment the current counter for the current CPU.
> */
> idx = rcu_ctrlblk.completed & 0x1;
> @@ -169,7 +169,7 @@ void __rcu_read_lock(void)
> * Now that the per-CPU counter has been incremented, we
> * are protected. We can therefore safely increment
> * the nesting counter, relieving further NMIs of the
> - * need to do so.
> + * need to increment the per-CPU counter.
> */
> current->rcu_read_lock_nesting = nesting + 1;
> barrier();
>
> _

2007-01-24 00:35:20

by Andrew Morton

[permalink] [raw]
Subject: Re: [mm PATCH 4/6] RCU: preemptible RCU

On Tue, 16 Jan 2007 00:58:58 +0530
Dipankar Sarma <[email protected]> wrote:

> This patch implements a new version of RCU which allows its read-side
> critical sections to be preempted.

Why is it selectable if CONFIG_PREEMPT=n?

> It uses a set of counter pairs
> to keep track of the read-side critical sections and flips them
> when all tasks exit read-side critical section. The details
> of this implementation can be found in this paper -
>
> http://www.rdrop.com/users/paulmck/RCU/OLSrtRCU.2006.08.11a.pdf
>
> This patch was developed as a part of the -rt kernel
> development and meant to provide better latencies when
> read-side critical sections of RCU don't disable preemption.

Does it succeed in that attempt? Thus far you've given no reason for
merging this code..

This is a pile of tricky new core kernel code for us to test, maintain,
understand, debug, etc. It needs to provide a substantial benefit. Does
it?

> As a consequence of keeping track of RCU readers, the readers
> have a slight overhead (optimizations in the paper).
> This implementation co-exists with the "classic" RCU
> implementations and can be switched to at compiler.

That's yet another question we need to ask people when their kernel dies,
and yet another deviation between the kernels which we all test, causing
more dilution of testing efforts. It would be much better if we could
remove classic RCU. You say this would incur extra cost, but the magnitude
of that cost is not clear. Please help us make that decision.


2007-01-24 00:39:31

by Andrew Morton

[permalink] [raw]
Subject: Re: [mm PATCH 4/6] RCU: preemptible RCU

On Tue, 16 Jan 2007 00:58:58 +0530
Dipankar Sarma <[email protected]> wrote:

> +/*
> + * Wait for CPUs to acknowledge the flip.
> + */
> +static int rcu_try_flip_waitack(int flipctr)
> +{
> + int cpu;
> +
> + for_each_possible_cpu(cpu)
> + if (per_cpu(rcu_flip_flag, cpu) != RCU_FLIP_SEEN)
> + return 1;
> +
> + /*
> + * Make sure our checks above don't bleed into subsequent
> + * waiting for the sum of the counters to reach zero.
> + */
> + smp_mb();
> + return 0;
> +}

Confused. If some of the possible cpus aren't online, doesn't the
state machine get stuck??

2007-01-26 21:02:17

by Dipankar Sarma

[permalink] [raw]
Subject: Re: [mm PATCH 4/6] RCU: preemptible RCU

On Tue, Jan 23, 2007 at 04:32:59PM -0800, Andrew Morton wrote:
> On Tue, 16 Jan 2007 00:58:58 +0530
> Dipankar Sarma <[email protected]> wrote:
>
> > This patch implements a new version of RCU which allows its read-side
> > critical sections to be preempted.
>
> Why is it selectable if CONFIG_PREEMPT=n?

It would probably make sense to make CONFIG_RCU_PREEMPT dependent
on CONFIG_PREEMPT. I kept it independent because I wanted to
test rcupreempt as an equivalent RCU implementation not just
for its "preemptibility".

> > It uses a set of counter pairs
> > to keep track of the read-side critical sections and flips them
> > when all tasks exit read-side critical section. The details
> > of this implementation can be found in this paper -
> >
> > http://www.rdrop.com/users/paulmck/RCU/OLSrtRCU.2006.08.11a.pdf
> >
> > This patch was developed as a part of the -rt kernel
> > development and meant to provide better latencies when
> > read-side critical sections of RCU don't disable preemption.
>
> Does it succeed in that attempt? Thus far you've given no reason for
> merging this code..
>
> This is a pile of tricky new core kernel code for us to test, maintain,
> understand, debug, etc. It needs to provide a substantial benefit. Does
> it?

I am working to compare some latency numbers between different
implementations. I will have them out soon.


> > As a consequence of keeping track of RCU readers, the readers
> > have a slight overhead (optimizations in the paper).
> > This implementation co-exists with the "classic" RCU
> > implementations and can be switched to at compiler.
>
> That's yet another question we need to ask people when their kernel dies,
> and yet another deviation between the kernels which we all test, causing
> more dilution of testing efforts. It would be much better if we could
> remove classic RCU. You say this would incur extra cost, but the magnitude
> of that cost is not clear. Please help us make that decision.

See the Table 2, page 10 of the paper mentioned above. There is a
~100ns cost per read-side critical section involved in the preemptible
version of RCU at the moment. Until, we are sure that we don't have
an impact on common workloads, we need to keep the "classic"
implementation around.

Thanks
Dipankar

2007-01-26 21:44:38

by Andrew Morton

[permalink] [raw]
Subject: Re: [mm PATCH 4/6] RCU: preemptible RCU

On Sat, 27 Jan 2007 02:30:17 +0530
Dipankar Sarma <[email protected]> wrote:

> > > As a consequence of keeping track of RCU readers, the readers
> > > have a slight overhead (optimizations in the paper).
> > > This implementation co-exists with the "classic" RCU
> > > implementations and can be switched to at compiler.
> >
> > That's yet another question we need to ask people when their kernel dies,
> > and yet another deviation between the kernels which we all test, causing
> > more dilution of testing efforts. It would be much better if we could
> > remove classic RCU. You say this would incur extra cost, but the magnitude
> > of that cost is not clear. Please help us make that decision.
>
> See the Table 2, page 10 of the paper mentioned above.

argh.

Seems I have to wade through half the paper to understand Table 2.

> There is a
> ~100ns cost per read-side critical section involved in the preemptible
> version of RCU at the moment. Until, we are sure that we don't have
> an impact on common workloads, we need to keep the "classic"
> implementation around.

Ratios, please.. that 100ns appears to be a 100% increase. ie 100ns -> 200ns.

There are a couple of ways of working out how much that really matters: a)
run a workload or b) instrument a kernel, work out how many times/sec the
kernel runs rcu_read_lock(). I suspect b) would be more useful and
informative.

Either way, please always prepare such info up-front and summarise in the
changelog? It's kinda important...