From: Rao Shoaib <[email protected]>
This patch updates kfree_rcu to use new bulk memory free functions as they
are more efficient. It also moves kfree_call_rcu() out of rcu related code to
mm/slab_common.c
Signed-off-by: Rao Shoaib <[email protected]>
---
include/linux/mm.h | 5 ++
kernel/rcu/tree.c | 14 ----
kernel/sysctl.c | 40 +++++++++++
mm/slab.h | 23 +++++++
mm/slab_common.c | 198 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
5 files changed, 264 insertions(+), 16 deletions(-)
diff --git a/include/linux/mm.h b/include/linux/mm.h
index ea818ff..8ae4f25 100644
--- a/include/linux/mm.h
+++ b/include/linux/mm.h
@@ -2669,5 +2669,10 @@ void __init setup_nr_node_ids(void);
static inline void setup_nr_node_ids(void) {}
#endif
+extern int sysctl_kfree_rcu_drain_limit;
+extern int sysctl_kfree_rcu_poll_limit;
+extern int sysctl_kfree_rcu_empty_limit;
+extern int sysctl_kfree_rcu_caching_allowed;
+
#endif /* __KERNEL__ */
#endif /* _LINUX_MM_H */
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index f9c0ca2..69951ef 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3209,20 +3209,6 @@ void call_rcu_bh(struct rcu_head *head, rcu_callback_t func)
EXPORT_SYMBOL_GPL(call_rcu_bh);
/*
- * Queue an RCU callback for lazy invocation after a grace period.
- * This will likely be later named something like "call_rcu_lazy()",
- * but this change will require some way of tagging the lazy RCU
- * callbacks in the list of pending callbacks. Until then, this
- * function may only be called from __kfree_rcu().
- */
-void kfree_call_rcu(struct rcu_head *head,
- rcu_callback_t func)
-{
- __call_rcu(head, func, rcu_state_p, -1, 1);
-}
-EXPORT_SYMBOL_GPL(kfree_call_rcu);
-
-/*
* Because a context switch is a grace period for RCU-sched and RCU-bh,
* any blocking grace-period wait automatically implies a grace period
* if there is only one CPU online at any point time during execution
diff --git a/kernel/sysctl.c b/kernel/sysctl.c
index 557d467..47b48f7 100644
--- a/kernel/sysctl.c
+++ b/kernel/sysctl.c
@@ -1655,6 +1655,46 @@ static struct ctl_table vm_table[] = {
.extra2 = (void *)&mmap_rnd_compat_bits_max,
},
#endif
+ {
+ .procname = "kfree_rcu_drain_limit",
+ .data = &sysctl_kfree_rcu_drain_limit,
+ .maxlen = sizeof(sysctl_kfree_rcu_drain_limit),
+ .mode = 0644,
+ .proc_handler = proc_dointvec_minmax,
+ .extra1 = &one,
+ .extra2 = &one_hundred,
+ },
+
+ {
+ .procname = "kfree_rcu_poll_limit",
+ .data = &sysctl_kfree_rcu_poll_limit,
+ .maxlen = sizeof(sysctl_kfree_rcu_poll_limit),
+ .mode = 0644,
+ .proc_handler = proc_dointvec_minmax,
+ .extra1 = &one,
+ .extra2 = &one_hundred,
+ },
+
+ {
+ .procname = "kfree_rcu_empty_limit",
+ .data = &sysctl_kfree_rcu_empty_limit,
+ .maxlen = sizeof(sysctl_kfree_rcu_empty_limit),
+ .mode = 0644,
+ .proc_handler = proc_dointvec_minmax,
+ .extra1 = &zero,
+ .extra2 = &four,
+ },
+
+ {
+ .procname = "kfree_rcu_caching_allowed",
+ .data = &sysctl_kfree_rcu_caching_allowed,
+ .maxlen = sizeof(sysctl_kfree_rcu_caching_allowed),
+ .mode = 0644,
+ .proc_handler = proc_dointvec_minmax,
+ .extra1 = &zero,
+ .extra2 = &one,
+ },
+
{ }
};
diff --git a/mm/slab.h b/mm/slab.h
index ad657ff..2541f70 100644
--- a/mm/slab.h
+++ b/mm/slab.h
@@ -78,6 +78,29 @@ extern const struct kmalloc_info_struct {
unsigned long size;
} kmalloc_info[];
+#define RCU_MAX_ACCUMULATE_SIZE 25
+
+struct rcu_bulk_free_container {
+ struct rcu_head rbfc_rcu;
+ int rbfc_entries;
+ void *rbfc_data[RCU_MAX_ACCUMULATE_SIZE];
+ struct rcu_bulk_free *rbfc_rbf;
+};
+
+struct rcu_bulk_free {
+ struct rcu_head rbf_rcu; /* used to schedule monitor process */
+ spinlock_t rbf_lock;
+ struct rcu_bulk_free_container *rbf_container;
+ struct rcu_bulk_free_container *rbf_cached_container;
+ struct rcu_head *rbf_list_head;
+ int rbf_list_size;
+ int rbf_cpu;
+ int rbf_empty;
+ int rbf_polled;
+ bool rbf_init;
+ bool rbf_monitor;
+};
+
unsigned long calculate_alignment(slab_flags_t flags,
unsigned long align, unsigned long size);
diff --git a/mm/slab_common.c b/mm/slab_common.c
index c8cb367..06fd12c 100644
--- a/mm/slab_common.c
+++ b/mm/slab_common.c
@@ -20,6 +20,7 @@
#include <asm/tlbflush.h>
#include <asm/page.h>
#include <linux/memcontrol.h>
+#include <linux/types.h>
#define CREATE_TRACE_POINTS
#include <trace/events/kmem.h>
@@ -129,6 +130,7 @@ int __kmem_cache_alloc_bulk(struct kmem_cache *s, gfp_t flags, size_t nr,
for (i = 0; i < nr; i++) {
void *x = p[i] = kmem_cache_alloc(s, flags);
+
if (!x) {
__kmem_cache_free_bulk(s, i, p);
return 0;
@@ -353,6 +355,7 @@ unsigned long calculate_alignment(slab_flags_t flags,
*/
if (flags & SLAB_HWCACHE_ALIGN) {
unsigned long ralign = cache_line_size();
+
while (size <= ralign / 2)
ralign /= 2;
align = max(align, ralign);
@@ -444,9 +447,8 @@ kmem_cache_create(const char *name, size_t size, size_t align,
mutex_lock(&slab_mutex);
err = kmem_cache_sanity_check(name, size);
- if (err) {
+ if (err)
goto out_unlock;
- }
/* Refuse requests with allocator specific flags */
if (flags & ~SLAB_FLAGS_PERMITTED) {
@@ -1131,6 +1133,7 @@ EXPORT_SYMBOL(kmalloc_order);
void *kmalloc_order_trace(size_t size, gfp_t flags, unsigned int order)
{
void *ret = kmalloc_order(size, flags, order);
+
trace_kmalloc(_RET_IP_, ret, size, PAGE_SIZE << order, flags);
return ret;
}
@@ -1483,6 +1486,197 @@ void kzfree(const void *p)
}
EXPORT_SYMBOL(kzfree);
+static DEFINE_PER_CPU(struct rcu_bulk_free, cpu_rbf);
+
+/* drain if atleast these many objects */
+int sysctl_kfree_rcu_drain_limit __read_mostly = 10;
+
+/* time to poll if fewer than drain_limit */
+int sysctl_kfree_rcu_poll_limit __read_mostly = 5;
+
+/* num of times to check bfr exit */
+int sysctl_kfree_rcu_empty_limit __read_mostly = 2;
+
+int sysctl_kfree_rcu_caching_allowed __read_mostly = 1;
+
+/* RCU call back function. Frees the memory */
+static void
+__rcu_bulk_free_impl(struct rcu_head *rbfc_rcu)
+{
+ struct rcu_bulk_free *rbf = NULL;
+ struct rcu_bulk_free_container *rbfc = container_of(rbfc_rcu,
+ struct rcu_bulk_free_container, rbfc_rcu);
+
+ WARN_ON(rbfc->rbfc_entries <= 0);
+ kfree_bulk(rbfc->rbfc_entries, rbfc->rbfc_data);
+
+ rbf = rbfc->rbfc_rbf;
+ if (!sysctl_kfree_rcu_caching_allowed ||
+ cmpxchg(&rbf->rbf_cached_container, NULL, rbfc) != NULL) {
+ kfree(rbfc);
+ }
+}
+
+/* processes list of rcu structures
+ * used when conatiner can not be allocated
+ */
+
+static void
+__rcu_bulk_schedule_list(struct rcu_bulk_free *rbf)
+{
+ int i = 0;
+
+ for (i = 0; i < rbf->rbf_list_size; i++) {
+ struct rcu_head *free_head;
+
+ free_head = rbf->rbf_list_head;
+ rbf->rbf_list_head = free_head->next;
+ free_head->next = NULL;
+ call_rcu(free_head, free_head->func);
+ }
+ WARN_ON(rbf->rbf_list_head != NULL);
+ rbf->rbf_list_size = 0;
+}
+
+
+/* RCU monitoring function -- submits elements for RCU reclaim */
+static void
+__rcu_bulk_free_monitor(struct rcu_head *rbf_rcu)
+{
+ struct rcu_bulk_free *rbf = NULL;
+ struct rcu_bulk_free_container *rbfc = NULL;
+
+ rbf = container_of(rbf_rcu, struct rcu_bulk_free, rbf_rcu);
+
+ spin_lock(&rbf->rbf_lock);
+
+ rbfc = rbf->rbf_container;
+
+ if (rbf->rbf_list_size > 0) {
+ WARN_ON(rbfc != NULL);
+ if ((rbf->rbf_list_size >= sysctl_kfree_rcu_drain_limit) ||
+ rbf->rbf_polled >= sysctl_kfree_rcu_poll_limit) {
+ rbf->rbf_polled = 0;
+ __rcu_bulk_schedule_list(rbf);
+ } else {
+ rbf->rbf_polled++;
+ }
+ } else if (rbfc != NULL) {
+ WARN_ON(rbfc->rbfc_entries <= 0);
+ if ((rbfc->rbfc_entries > sysctl_kfree_rcu_drain_limit) ||
+ rbf->rbf_polled++ >= sysctl_kfree_rcu_poll_limit) {
+ rbf->rbf_polled = 0;
+ call_rcu(&rbfc->rbfc_rcu, __rcu_bulk_free_impl);
+ rbf->rbf_container = NULL;
+ }
+ } else {
+ /* Nothing to do, keep track */
+ rbf->rbf_empty++;
+ }
+
+ if (rbf->rbf_empty >= sysctl_kfree_rcu_empty_limit) {
+ rbf->rbf_monitor = false;
+ rbf->rbf_empty = 0;
+ }
+
+ spin_unlock(&rbf->rbf_lock);
+
+ if (rbf->rbf_monitor)
+ call_rcu(&rbf->rbf_rcu, __rcu_bulk_free_monitor);
+}
+
+/* Main RCU function that is called to free RCU structures */
+static void
+__rcu_bulk_free(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
+{
+ unsigned long offset;
+ void *ptr;
+ struct rcu_bulk_free *rbf;
+ struct rcu_bulk_free_container *rbfc = NULL;
+
+ rbf = this_cpu_ptr(&cpu_rbf);
+
+ if (unlikely(!rbf->rbf_init)) {
+ spin_lock_init(&rbf->rbf_lock);
+ rbf->rbf_cpu = smp_processor_id();
+ rbf->rbf_init = true;
+ }
+
+ /* hold lock to protect against other cpu's */
+ spin_lock_bh(&rbf->rbf_lock);
+
+ rbfc = rbf->rbf_container;
+
+ if (rbfc == NULL) {
+ if (rbf->rbf_cached_container == NULL) {
+ rbf->rbf_container =
+ kmalloc(sizeof(struct rcu_bulk_free_container),
+ GFP_ATOMIC);
+ rbf->rbf_container->rbfc_rbf = rbf;
+ } else {
+ rbf->rbf_container = rbf->rbf_cached_container;
+ rbf->rbf_container->rbfc_rbf = rbf;
+ cmpxchg(&rbf->rbf_cached_container,
+ rbf->rbf_cached_container, NULL);
+ }
+
+ if (unlikely(rbf->rbf_container == NULL)) {
+
+ /* Memory allocation failed maintain a list */
+
+ head->func = (void *)func;
+ head->next = rbf->rbf_list_head;
+ rbf->rbf_list_head = head;
+ rbf->rbf_list_size++;
+ if (rbf->rbf_list_size == RCU_MAX_ACCUMULATE_SIZE)
+ __rcu_bulk_schedule_list(rbf);
+
+ goto done;
+ }
+
+ rbfc = rbf->rbf_container;
+ rbfc->rbfc_entries = 0;
+
+ if (rbf->rbf_list_head != NULL)
+ __rcu_bulk_schedule_list(rbf);
+ }
+
+ offset = (unsigned long)func;
+ ptr = (void *)head - offset;
+
+ rbfc->rbfc_data[rbfc->rbfc_entries++] = ptr;
+ if (rbfc->rbfc_entries == RCU_MAX_ACCUMULATE_SIZE) {
+
+ WRITE_ONCE(rbf->rbf_container, NULL);
+ spin_unlock_bh(&rbf->rbf_lock);
+ call_rcu(&rbfc->rbfc_rcu, __rcu_bulk_free_impl);
+ return;
+ }
+
+done:
+ if (!rbf->rbf_monitor) {
+
+ call_rcu(&rbf->rbf_rcu, __rcu_bulk_free_monitor);
+ rbf->rbf_monitor = true;
+ }
+
+ spin_unlock_bh(&rbf->rbf_lock);
+}
+
+/*
+ * Queue an RCU callback for lazy invocation after a grace period.
+ * This will likely be later named something like "call_rcu_lazy()",
+ * but this change will require some way of tagging the lazy RCU
+ * callbacks in the list of pending callbacks. Until then, this
+ * function may only be called from __kfree_rcu().
+ */
+void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+{
+ __rcu_bulk_free(head, func, -1, 1);
+}
+EXPORT_SYMBOL_GPL(kfree_call_rcu);
+
+
/* Tracepoints definitions. */
EXPORT_TRACEPOINT_SYMBOL(kmalloc);
EXPORT_TRACEPOINT_SYMBOL(kmem_cache_alloc);
--
2.7.4
On Tue, Dec 19, 2017 at 09:52:27AM -0800, [email protected] wrote:
> This patch updates kfree_rcu to use new bulk memory free functions as they
> are more efficient. It also moves kfree_call_rcu() out of rcu related code to
> mm/slab_common.c
>
> Signed-off-by: Rao Shoaib <[email protected]>
> ---
> include/linux/mm.h | 5 ++
> kernel/rcu/tree.c | 14 ----
> kernel/sysctl.c | 40 +++++++++++
> mm/slab.h | 23 +++++++
> mm/slab_common.c | 198 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
> 5 files changed, 264 insertions(+), 16 deletions(-)
You've added an awful lot of code. Do you have any performance measurements
that shows this to be a win?
On Tue, Dec 19, 2017 at 09:52:27AM -0800, [email protected] wrote:
> @@ -129,6 +130,7 @@ int __kmem_cache_alloc_bulk(struct kmem_cache *s, gfp_t flags, size_t nr,
>
> for (i = 0; i < nr; i++) {
> void *x = p[i] = kmem_cache_alloc(s, flags);
> +
> if (!x) {
> __kmem_cache_free_bulk(s, i, p);
> return 0;
Don't mix whitespace changes with significant patches.
> +/* Main RCU function that is called to free RCU structures */
> +static void
> +__rcu_bulk_free(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
> +{
> + unsigned long offset;
> + void *ptr;
> + struct rcu_bulk_free *rbf;
> + struct rcu_bulk_free_container *rbfc = NULL;
> +
> + rbf = this_cpu_ptr(&cpu_rbf);
> +
> + if (unlikely(!rbf->rbf_init)) {
> + spin_lock_init(&rbf->rbf_lock);
> + rbf->rbf_cpu = smp_processor_id();
> + rbf->rbf_init = true;
> + }
> +
> + /* hold lock to protect against other cpu's */
> + spin_lock_bh(&rbf->rbf_lock);
Are you sure we can't call kfree_rcu() from interrupt context?
> + rbfc = rbf->rbf_container;
> + rbfc->rbfc_entries = 0;
> +
> + if (rbf->rbf_list_head != NULL)
> + __rcu_bulk_schedule_list(rbf);
You've broken RCU. Consider this scenario:
Thread 1 Thread 2 Thread 3
kfree_rcu(a)
schedule()
schedule()
gets pointer to b
kfree_rcu(b)
processes rcu callbacks
uses b
Thread 3 will free a and also free b, so thread 2 is going to use freed
memory and go splat. You can't batch up memory to be freed without
taking into account the grace periods.
It might make sense for RCU to batch up all the memory it's going to free
in a single grace period, and hand it all off to slub at once, but that's
not what you've done here.
I've been doing a lot of thinking about this because I really want a
way to kfree_rcu() an object without embedding a struct rcu_head in it.
But I see no way to do that today; even if we have an external memory
allocation to point to the object to be freed, we have to keep track of
the grace periods.
On Tue, 19 Dec 2017 09:52:27 -0800 [email protected] wrote:
> diff --git a/mm/slab_common.c b/mm/slab_common.c
> index c8cb367..06fd12c 100644
> --- a/mm/slab_common.c
> +++ b/mm/slab_common.c
> @@ -20,6 +20,7 @@
> #include <asm/tlbflush.h>
> #include <asm/page.h>
> #include <linux/memcontrol.h>
> +#include <linux/types.h>
>
> #define CREATE_TRACE_POINTS
> #include <trace/events/kmem.h>
> @@ -129,6 +130,7 @@ int __kmem_cache_alloc_bulk(struct kmem_cache *s, gfp_t flags, size_t nr,
>
> for (i = 0; i < nr; i++) {
> void *x = p[i] = kmem_cache_alloc(s, flags);
> +
> if (!x) {
> __kmem_cache_free_bulk(s, i, p);
> return 0;
> @@ -353,6 +355,7 @@ unsigned long calculate_alignment(slab_flags_t flags,
> */
> if (flags & SLAB_HWCACHE_ALIGN) {
> unsigned long ralign = cache_line_size();
> +
> while (size <= ralign / 2)
> ralign /= 2;
> align = max(align, ralign);
> @@ -444,9 +447,8 @@ kmem_cache_create(const char *name, size_t size, size_t align,
> mutex_lock(&slab_mutex);
>
> err = kmem_cache_sanity_check(name, size);
> - if (err) {
> + if (err)
> goto out_unlock;
> - }
>
> /* Refuse requests with allocator specific flags */
> if (flags & ~SLAB_FLAGS_PERMITTED) {
> @@ -1131,6 +1133,7 @@ EXPORT_SYMBOL(kmalloc_order);
> void *kmalloc_order_trace(size_t size, gfp_t flags, unsigned int order)
> {
> void *ret = kmalloc_order(size, flags, order);
> +
> trace_kmalloc(_RET_IP_, ret, size, PAGE_SIZE << order, flags);
> return ret;
> }
Looks like you are mixing in cleanups (which should be avoided, and
instead moved to another patch).
> @@ -1483,6 +1486,197 @@ void kzfree(const void *p)
[...]
> +
> +/* processes list of rcu structures
> + * used when conatiner can not be allocated
> + */
Spelling.
--
Best regards,
Jesper Dangaard Brouer
MSc.CS, Principal Kernel Engineer at Red Hat
LinkedIn: http://www.linkedin.com/in/brouer
On Tue, 19 Dec 2017, [email protected] wrote:
> This patch updates kfree_rcu to use new bulk memory free functions as they
> are more efficient. It also moves kfree_call_rcu() out of rcu related code to
> mm/slab_common.c
It would be great to have separate patches so that we can review it
properly:
1. Move the code into slab_common.c
2. The actual code changes to the kfree rcu mechanism
3. The whitespace changes
On 12/19/2017 11:12 AM, Matthew Wilcox wrote:
> On Tue, Dec 19, 2017 at 09:52:27AM -0800, [email protected] wrote:
>> This patch updates kfree_rcu to use new bulk memory free functions as they
>> are more efficient. It also moves kfree_call_rcu() out of rcu related code to
>> mm/slab_common.c
>>
>> Signed-off-by: Rao Shoaib <[email protected]>
>> ---
>> include/linux/mm.h | 5 ++
>> kernel/rcu/tree.c | 14 ----
>> kernel/sysctl.c | 40 +++++++++++
>> mm/slab.h | 23 +++++++
>> mm/slab_common.c | 198 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>> 5 files changed, 264 insertions(+), 16 deletions(-)
> You've added an awful lot of code. Do you have any performance measurements
> that shows this to be a win?
I did some micro benchmarking when I was developing the code and did see
performance gains -- see attached.
I tried several networking benchmarks but was not able to get any
improvement . The reason is that these benchmarks do not exercise the
code we are improving. So I looked at the kernel source for users of
kfree_rcu(). It turns out that directory deletion code calls kfree_rcu
to free the data structure when an entry is deleted. Based on that I
created two benchmarks.
1) make_dirs -- This benchmark creates multi level directory structure
and than deletes it. It's the delete part where we see the performance
gain of about 8.3%. The creation time remains same.
This benchmark was derived from fdtree benchmark at
https://computing.llnl.gov/?set=code&page=sio_downloads ==>
https://github.com/llnl/fdtree
2) tsock -- I also noticed that a socket has an entry in a directory and
when the socket is closed the directory entry is deleted. So I wrote a
simple benchmark that goes in a loop a million times and opens and
closes 10 sockets per iteration. This shows an improvement of 7.6%
I have attached the benchmarks and results. Unchanged results are for
stock kernel, Changed are for the modified kernel.
Shoaib
On 12/19/2017 11:30 AM, Matthew Wilcox wrote:
> On Tue, Dec 19, 2017 at 09:52:27AM -0800, [email protected] wrote:
>> @@ -129,6 +130,7 @@ int __kmem_cache_alloc_bulk(struct kmem_cache *s, gfp_t flags, size_t nr,
>>
>> for (i = 0; i < nr; i++) {
>> void *x = p[i] = kmem_cache_alloc(s, flags);
>> +
>> if (!x) {
>> __kmem_cache_free_bulk(s, i, p);
>> return 0;
> Don't mix whitespace changes with significant patches.
OK.
>
>> +/* Main RCU function that is called to free RCU structures */
>> +static void
>> +__rcu_bulk_free(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
>> +{
>> + unsigned long offset;
>> + void *ptr;
>> + struct rcu_bulk_free *rbf;
>> + struct rcu_bulk_free_container *rbfc = NULL;
>> +
>> + rbf = this_cpu_ptr(&cpu_rbf);
>> +
>> + if (unlikely(!rbf->rbf_init)) {
>> + spin_lock_init(&rbf->rbf_lock);
>> + rbf->rbf_cpu = smp_processor_id();
>> + rbf->rbf_init = true;
>> + }
>> +
>> + /* hold lock to protect against other cpu's */
>> + spin_lock_bh(&rbf->rbf_lock);
> Are you sure we can't call kfree_rcu() from interrupt context?
I thought about it, but the interrupts are off due to acquiring the
lock. No ?
>
>> + rbfc = rbf->rbf_container;
>> + rbfc->rbfc_entries = 0;
>> +
>> + if (rbf->rbf_list_head != NULL)
>> + __rcu_bulk_schedule_list(rbf);
> You've broken RCU. Consider this scenario:
>
> Thread 1 Thread 2 Thread 3
> kfree_rcu(a)
> schedule()
> schedule()
> gets pointer to b
> kfree_rcu(b)
> processes rcu callbacks
> uses b
>
> Thread 3 will free a and also free b, so thread 2 is going to use freed
> memory and go splat. You can't batch up memory to be freed without
> taking into account the grace periods.
The code does not change the grace period at all. In fact it adds to the
grace period.
The free's are accumulated in an array, when a certain limit/time is
reached the frees are submitted
to RCU for freeing. So the grace period is maintained starting from the
time of the last free.
In case the memory allocation fails the code uses a list that is also
submitted to RCU for freeing.
>
> It might make sense for RCU to batch up all the memory it's going to free
> in a single grace period, and hand it all off to slub at once, but that's
> not what you've done here.
I am kind of doing that but not on a per grace period but on a per cpu
basis.
>
>
> I've been doing a lot of thinking about this because I really want a
> way to kfree_rcu() an object without embedding a struct rcu_head in it.
> But I see no way to do that today; even if we have an external memory
> allocation to point to the object to be freed, we have to keep track of
> the grace periods.
I am not sure I understand. If you had external memory you can easily do
that.
I am exactly doing that, the only reason the RCU structure is needed is
to get the pointer to the object being freed.
Shoaib
On 12/19/2017 11:33 AM, Christopher Lameter wrote:
> On Tue, 19 Dec 2017, [email protected] wrote:
>
>> This patch updates kfree_rcu to use new bulk memory free functions as they
>> are more efficient. It also moves kfree_call_rcu() out of rcu related code to
>> mm/slab_common.c
> It would be great to have separate patches so that we can review it
> properly:
>
> 1. Move the code into slab_common.c
> 2. The actual code changes to the kfree rcu mechanism
> 3. The whitespace changes
>
> --
> To unsubscribe, send a message with 'unsubscribe linux-mm' in
> the body to [email protected]. For more info on Linux MM,
> see: http://www.linux-mm.org/ .
> Don't email: <a href=mailto:"[email protected]"> [email protected] </a>
I can certainly break down the patch and submit smaller patches as you
have suggested.
BTW -- This is my first ever patch to Linux, so I am still learning the
etiquette.
Shoaib
On Tue, Dec 19, 2017 at 11:56:30AM -0800, Rao Shoaib wrote:
> On 12/19/2017 11:30 AM, Matthew Wilcox wrote:
> >On Tue, Dec 19, 2017 at 09:52:27AM -0800, [email protected] wrote:
[ . . . ]
> >I've been doing a lot of thinking about this because I really want a
> >way to kfree_rcu() an object without embedding a struct rcu_head in it.
> >But I see no way to do that today; even if we have an external memory
> >allocation to point to the object to be freed, we have to keep track of
> >the grace periods.
> I am not sure I understand. If you had external memory you can
> easily do that.
> I am exactly doing that, the only reason the RCU structure is needed
> is to get the pointer to the object being freed.
This can be done as long as you are willing to either:
1. Occasionally have kfree_rcu() wait for a grace period.
2. Occasionally have kfree_rcu() allocate memory.
3. Keep the rcu_head, but use it only when you would otherwise
have to accept the above two penalties. (The point of this
is that tracking lists of memory waiting for a grace period
using dense arrays improves cache locality.)
There might be others, and if you come up with one, please don't keep it
a secret. The C++ standards committee insisted on an interface using
option #2 above. (There is also an option to use their equivalent of
an rcu_head.)
Thanx, Paul
On Tue, 19 Dec 2017 09:52:27 -0800 [email protected] wrote:
> +/* Main RCU function that is called to free RCU structures */
> +static void
> +__rcu_bulk_free(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
> +{
> + unsigned long offset;
> + void *ptr;
> + struct rcu_bulk_free *rbf;
> + struct rcu_bulk_free_container *rbfc = NULL;
> +
> + rbf = this_cpu_ptr(&cpu_rbf);
> +
> + if (unlikely(!rbf->rbf_init)) {
> + spin_lock_init(&rbf->rbf_lock);
> + rbf->rbf_cpu = smp_processor_id();
> + rbf->rbf_init = true;
> + }
> +
> + /* hold lock to protect against other cpu's */
> + spin_lock_bh(&rbf->rbf_lock);
I'm not sure this will be faster. Having to take a cross CPU lock here
(+ BH-disable) could cause scaling issues. Hopefully this lock will
not be used intensively by other CPUs, right?
The current cost of __call_rcu() is a local_irq_save/restore (which is
quite expensive, but doesn't cause cross CPU chatter).
Later in __rcu_process_callbacks() we have a local_irq_save/restore for
the entire list, plus a per object cost doing local_bh_disable/enable.
And for each object we call __rcu_reclaim(), which in some cases
directly call kfree().
If I had to implement this: I would choose to do the optimization in
__rcu_process_callbacks() create small on-call-stack ptr-array for
kfree_bulk(). I would only optimize the case that call kfree()
directly. In the while(list) loop I would defer calling
__rcu_reclaim() for __is_kfree_rcu_offset(head->func), and instead add
them to the ptr-array (and flush if the array is full in loop, and
kfree_bulk flush after loop).
The real advantage of kfree_bulk() comes from amortizing the per kfree
(behind-the-scenes) sync cost. There is an additional benefit, because
objects comes from RCU and will hit a slower path in SLUB. The SLUB
allocator is very fast for objects that gets recycled quickly (short
lifetime), non-locked (cpu-local) double-cmpxchg. But slower for
longer-lived/more-outstanding objects, as this hits a slower code-path,
fully locked (cross-cpu) double-cmpxchg.
> +
> + rbfc = rbf->rbf_container;
> +
> + if (rbfc == NULL) {
> + if (rbf->rbf_cached_container == NULL) {
> + rbf->rbf_container =
> + kmalloc(sizeof(struct rcu_bulk_free_container),
> + GFP_ATOMIC);
> + rbf->rbf_container->rbfc_rbf = rbf;
> + } else {
> + rbf->rbf_container = rbf->rbf_cached_container;
> + rbf->rbf_container->rbfc_rbf = rbf;
> + cmpxchg(&rbf->rbf_cached_container,
> + rbf->rbf_cached_container, NULL);
> + }
> +
> + if (unlikely(rbf->rbf_container == NULL)) {
> +
> + /* Memory allocation failed maintain a list */
> +
> + head->func = (void *)func;
> + head->next = rbf->rbf_list_head;
> + rbf->rbf_list_head = head;
> + rbf->rbf_list_size++;
> + if (rbf->rbf_list_size == RCU_MAX_ACCUMULATE_SIZE)
> + __rcu_bulk_schedule_list(rbf);
> +
> + goto done;
> + }
> +
> + rbfc = rbf->rbf_container;
> + rbfc->rbfc_entries = 0;
> +
> + if (rbf->rbf_list_head != NULL)
> + __rcu_bulk_schedule_list(rbf);
> + }
> +
> + offset = (unsigned long)func;
> + ptr = (void *)head - offset;
> +
> + rbfc->rbfc_data[rbfc->rbfc_entries++] = ptr;
> + if (rbfc->rbfc_entries == RCU_MAX_ACCUMULATE_SIZE) {
> +
> + WRITE_ONCE(rbf->rbf_container, NULL);
> + spin_unlock_bh(&rbf->rbf_lock);
> + call_rcu(&rbfc->rbfc_rcu, __rcu_bulk_free_impl);
> + return;
> + }
> +
> +done:
> + if (!rbf->rbf_monitor) {
> +
> + call_rcu(&rbf->rbf_rcu, __rcu_bulk_free_monitor);
> + rbf->rbf_monitor = true;
> + }
> +
> + spin_unlock_bh(&rbf->rbf_lock);
> +}
--
Best regards,
Jesper Dangaard Brouer
MSc.CS, Principal Kernel Engineer at Red Hat
LinkedIn: http://www.linkedin.com/in/brouer
On Tue, Dec 19, 2017 at 09:41:58PM +0100, Jesper Dangaard Brouer wrote:
>
> On Tue, 19 Dec 2017 09:52:27 -0800 [email protected] wrote:
>
> > +/* Main RCU function that is called to free RCU structures */
> > +static void
> > +__rcu_bulk_free(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
> > +{
> > + unsigned long offset;
> > + void *ptr;
> > + struct rcu_bulk_free *rbf;
> > + struct rcu_bulk_free_container *rbfc = NULL;
> > +
> > + rbf = this_cpu_ptr(&cpu_rbf);
> > +
> > + if (unlikely(!rbf->rbf_init)) {
> > + spin_lock_init(&rbf->rbf_lock);
> > + rbf->rbf_cpu = smp_processor_id();
> > + rbf->rbf_init = true;
> > + }
> > +
> > + /* hold lock to protect against other cpu's */
> > + spin_lock_bh(&rbf->rbf_lock);
>
> I'm not sure this will be faster. Having to take a cross CPU lock here
> (+ BH-disable) could cause scaling issues. Hopefully this lock will
> not be used intensively by other CPUs, right?
>
>
> The current cost of __call_rcu() is a local_irq_save/restore (which is
> quite expensive, but doesn't cause cross CPU chatter).
>
> Later in __rcu_process_callbacks() we have a local_irq_save/restore for
> the entire list, plus a per object cost doing local_bh_disable/enable.
> And for each object we call __rcu_reclaim(), which in some cases
> directly call kfree().
Isn't this lock in a per-CPU object? It -might- go cross-CPU in response
to CPU-hotplug operations, but that should be rare.
Thanx, Paul
> If I had to implement this: I would choose to do the optimization in
> __rcu_process_callbacks() create small on-call-stack ptr-array for
> kfree_bulk(). I would only optimize the case that call kfree()
> directly. In the while(list) loop I would defer calling
> __rcu_reclaim() for __is_kfree_rcu_offset(head->func), and instead add
> them to the ptr-array (and flush if the array is full in loop, and
> kfree_bulk flush after loop).
>
> The real advantage of kfree_bulk() comes from amortizing the per kfree
> (behind-the-scenes) sync cost. There is an additional benefit, because
> objects comes from RCU and will hit a slower path in SLUB. The SLUB
> allocator is very fast for objects that gets recycled quickly (short
> lifetime), non-locked (cpu-local) double-cmpxchg. But slower for
> longer-lived/more-outstanding objects, as this hits a slower code-path,
> fully locked (cross-cpu) double-cmpxchg.
>
> > +
> > + rbfc = rbf->rbf_container;
> > +
> > + if (rbfc == NULL) {
> > + if (rbf->rbf_cached_container == NULL) {
> > + rbf->rbf_container =
> > + kmalloc(sizeof(struct rcu_bulk_free_container),
> > + GFP_ATOMIC);
> > + rbf->rbf_container->rbfc_rbf = rbf;
> > + } else {
> > + rbf->rbf_container = rbf->rbf_cached_container;
> > + rbf->rbf_container->rbfc_rbf = rbf;
> > + cmpxchg(&rbf->rbf_cached_container,
> > + rbf->rbf_cached_container, NULL);
> > + }
> > +
> > + if (unlikely(rbf->rbf_container == NULL)) {
> > +
> > + /* Memory allocation failed maintain a list */
> > +
> > + head->func = (void *)func;
> > + head->next = rbf->rbf_list_head;
> > + rbf->rbf_list_head = head;
> > + rbf->rbf_list_size++;
> > + if (rbf->rbf_list_size == RCU_MAX_ACCUMULATE_SIZE)
> > + __rcu_bulk_schedule_list(rbf);
> > +
> > + goto done;
> > + }
> > +
> > + rbfc = rbf->rbf_container;
> > + rbfc->rbfc_entries = 0;
> > +
> > + if (rbf->rbf_list_head != NULL)
> > + __rcu_bulk_schedule_list(rbf);
> > + }
> > +
> > + offset = (unsigned long)func;
> > + ptr = (void *)head - offset;
> > +
> > + rbfc->rbfc_data[rbfc->rbfc_entries++] = ptr;
> > + if (rbfc->rbfc_entries == RCU_MAX_ACCUMULATE_SIZE) {
> > +
> > + WRITE_ONCE(rbf->rbf_container, NULL);
> > + spin_unlock_bh(&rbf->rbf_lock);
> > + call_rcu(&rbfc->rbfc_rcu, __rcu_bulk_free_impl);
> > + return;
> > + }
> > +
> > +done:
> > + if (!rbf->rbf_monitor) {
> > +
> > + call_rcu(&rbf->rbf_rcu, __rcu_bulk_free_monitor);
> > + rbf->rbf_monitor = true;
> > + }
> > +
> > + spin_unlock_bh(&rbf->rbf_lock);
> > +}
>
>
> --
> Best regards,
> Jesper Dangaard Brouer
> MSc.CS, Principal Kernel Engineer at Red Hat
> LinkedIn: http://www.linkedin.com/in/brouer
>
On 12/19/2017 12:41 PM, Jesper Dangaard Brouer wrote:
> On Tue, 19 Dec 2017 09:52:27 -0800 [email protected] wrote:
>
>> +/* Main RCU function that is called to free RCU structures */
>> +static void
>> +__rcu_bulk_free(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
>> +{
>> + unsigned long offset;
>> + void *ptr;
>> + struct rcu_bulk_free *rbf;
>> + struct rcu_bulk_free_container *rbfc = NULL;
>> +
>> + rbf = this_cpu_ptr(&cpu_rbf);
>> +
>> + if (unlikely(!rbf->rbf_init)) {
>> + spin_lock_init(&rbf->rbf_lock);
>> + rbf->rbf_cpu = smp_processor_id();
>> + rbf->rbf_init = true;
>> + }
>> +
>> + /* hold lock to protect against other cpu's */
>> + spin_lock_bh(&rbf->rbf_lock);
> I'm not sure this will be faster. Having to take a cross CPU lock here
> (+ BH-disable) could cause scaling issues. Hopefully this lock will
> not be used intensively by other CPUs, right?
>
>
> The current cost of __call_rcu() is a local_irq_save/restore (which is
> quite expensive, but doesn't cause cross CPU chatter).
>
> Later in __rcu_process_callbacks() we have a local_irq_save/restore for
> the entire list, plus a per object cost doing local_bh_disable/enable.
> And for each object we call __rcu_reclaim(), which in some cases
> directly call kfree().
As Paul has pointed out the lock is a per-cpu lock, the only reason for
another CPU to access this lock is if the rcu callbacks run on a
different CPU and there is nothing the code can do to avoid that but
that should be rare anyways.
>
>
> If I had to implement this: I would choose to do the optimization in
> __rcu_process_callbacks() create small on-call-stack ptr-array for
> kfree_bulk(). I would only optimize the case that call kfree()
> directly. In the while(list) loop I would defer calling
> __rcu_reclaim() for __is_kfree_rcu_offset(head->func), and instead add
> them to the ptr-array (and flush if the array is full in loop, and
> kfree_bulk flush after loop).
This is exactly what the current code is doing. It accumulates only the
calls made to
__kfree_rcu(head, offset) ==> kfree_call_rcu() ==> __bulk_free_rcu
__kfree_rcu has a check to make sure that an offset is being passed.
When a function pointer is passed the caller has to call
call_rcu/call_rcu_sched
Accumulating early avoids the individual cost of calling __call_rcu
Perhaps I do not understand your point.
Shoaib
>
> The real advantage of kfree_bulk() comes from amortizing the per kfree
> (behind-the-scenes) sync cost. There is an additional benefit, because
> objects comes from RCU and will hit a slower path in SLUB. The SLUB
> allocator is very fast for objects that gets recycled quickly (short
> lifetime), non-locked (cpu-local) double-cmpxchg. But slower for
> longer-lived/more-outstanding objects, as this hits a slower code-path,
> fully locked (cross-cpu) double-cmpxchg.
>
>> +
>> + rbfc = rbf->rbf_container;
>> +
>> + if (rbfc == NULL) {
>> + if (rbf->rbf_cached_container == NULL) {
>> + rbf->rbf_container =
>> + kmalloc(sizeof(struct rcu_bulk_free_container),
>> + GFP_ATOMIC);
>> + rbf->rbf_container->rbfc_rbf = rbf;
>> + } else {
>> + rbf->rbf_container = rbf->rbf_cached_container;
>> + rbf->rbf_container->rbfc_rbf = rbf;
>> + cmpxchg(&rbf->rbf_cached_container,
>> + rbf->rbf_cached_container, NULL);
>> + }
>> +
>> + if (unlikely(rbf->rbf_container == NULL)) {
>> +
>> + /* Memory allocation failed maintain a list */
>> +
>> + head->func = (void *)func;
>> + head->next = rbf->rbf_list_head;
>> + rbf->rbf_list_head = head;
>> + rbf->rbf_list_size++;
>> + if (rbf->rbf_list_size == RCU_MAX_ACCUMULATE_SIZE)
>> + __rcu_bulk_schedule_list(rbf);
>> +
>> + goto done;
>> + }
>> +
>> + rbfc = rbf->rbf_container;
>> + rbfc->rbfc_entries = 0;
>> +
>> + if (rbf->rbf_list_head != NULL)
>> + __rcu_bulk_schedule_list(rbf);
>> + }
>> +
>> + offset = (unsigned long)func;
>> + ptr = (void *)head - offset;
>> +
>> + rbfc->rbfc_data[rbfc->rbfc_entries++] = ptr;
>> + if (rbfc->rbfc_entries == RCU_MAX_ACCUMULATE_SIZE) {
>> +
>> + WRITE_ONCE(rbf->rbf_container, NULL);
>> + spin_unlock_bh(&rbf->rbf_lock);
>> + call_rcu(&rbfc->rbfc_rcu, __rcu_bulk_free_impl);
>> + return;
>> + }
>> +
>> +done:
>> + if (!rbf->rbf_monitor) {
>> +
>> + call_rcu(&rbf->rbf_rcu, __rcu_bulk_free_monitor);
>> + rbf->rbf_monitor = true;
>> + }
>> +
>> + spin_unlock_bh(&rbf->rbf_lock);
>> +}
>
On Tue, Dec 19, 2017 at 09:41:58PM +0100, Jesper Dangaard Brouer wrote:
> If I had to implement this: I would choose to do the optimization in
> __rcu_process_callbacks() create small on-call-stack ptr-array for
> kfree_bulk(). I would only optimize the case that call kfree()
> directly. In the while(list) loop I would defer calling
> __rcu_reclaim() for __is_kfree_rcu_offset(head->func), and instead add
> them to the ptr-array (and flush if the array is full in loop, and
> kfree_bulk flush after loop).
>
> The real advantage of kfree_bulk() comes from amortizing the per kfree
> (behind-the-scenes) sync cost. There is an additional benefit, because
> objects comes from RCU and will hit a slower path in SLUB. The SLUB
> allocator is very fast for objects that gets recycled quickly (short
> lifetime), non-locked (cpu-local) double-cmpxchg. But slower for
> longer-lived/more-outstanding objects, as this hits a slower code-path,
> fully locked (cross-cpu) double-cmpxchg.
Something like this ... (compile tested only)
Considerably less code; Rao, what do you think?
diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
index 59c471de342a..5ac4ed077233 100644
--- a/kernel/rcu/rcu.h
+++ b/kernel/rcu/rcu.h
@@ -174,20 +174,19 @@ static inline void debug_rcu_head_unqueue(struct rcu_head *head)
}
#endif /* #else !CONFIG_DEBUG_OBJECTS_RCU_HEAD */
-void kfree(const void *);
-
/*
* Reclaim the specified callback, either by invoking it (non-lazy case)
* or freeing it directly (lazy case). Return true if lazy, false otherwise.
*/
-static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head)
+static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head, void **kfree,
+ unsigned int *idx)
{
unsigned long offset = (unsigned long)head->func;
rcu_lock_acquire(&rcu_callback_map);
if (__is_kfree_rcu_offset(offset)) {
RCU_TRACE(trace_rcu_invoke_kfree_callback(rn, head, offset);)
- kfree((void *)head - offset);
+ kfree[*idx++] = (void *)head - offset;
rcu_lock_release(&rcu_callback_map);
return true;
} else {
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index f9c0ca2ccf0c..7e13979b4697 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2725,6 +2725,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
struct rcu_head *rhp;
struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
long bl, count;
+ void *to_free[16];
+ unsigned int to_free_idx = 0;
/* If no callbacks are ready, just return. */
if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
@@ -2755,8 +2757,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
rhp = rcu_cblist_dequeue(&rcl);
for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
debug_rcu_head_unqueue(rhp);
- if (__rcu_reclaim(rsp->name, rhp))
+ if (__rcu_reclaim(rsp->name, rhp, to_free, &to_free_idx))
rcu_cblist_dequeued_lazy(&rcl);
+ if (to_free_idx == 16)
+ kfree_bulk(16, to_free);
/*
* Stop only if limit reached and CPU has something to do.
* Note: The rcl structure counts down from zero.
@@ -2766,6 +2770,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
(!is_idle_task(current) && !rcu_is_callbacks_kthread())))
break;
}
+ if (to_free_idx)
+ kfree_bulk(to_free_idx, to_free);
local_irq_save(flags);
count = -rcl.len;
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index db85ca3975f1..4127be06759b 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -2189,6 +2189,8 @@ static int rcu_nocb_kthread(void *arg)
struct rcu_head *next;
struct rcu_head **tail;
struct rcu_data *rdp = arg;
+ void *to_free[16];
+ unsigned int to_free_idx = 0;
/* Each pass through this loop invokes one batch of callbacks */
for (;;) {
@@ -2226,13 +2228,18 @@ static int rcu_nocb_kthread(void *arg)
}
debug_rcu_head_unqueue(list);
local_bh_disable();
- if (__rcu_reclaim(rdp->rsp->name, list))
+ if (__rcu_reclaim(rdp->rsp->name, list, to_free,
+ &to_free_idx))
cl++;
c++;
+ if (to_free_idx == 16)
+ kfree_bulk(16, to_free);
local_bh_enable();
cond_resched_rcu_qs();
list = next;
}
+ if (to_free_idx)
+ kfree_bulk(to_free_idx, to_free);
trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
smp_mb__before_atomic(); /* _add after CB invocation. */
atomic_long_add(-c, &rdp->nocb_q_count);
On 12/19/2017 02:12 PM, Matthew Wilcox wrote:
> On Tue, Dec 19, 2017 at 09:41:58PM +0100, Jesper Dangaard Brouer wrote:
>> If I had to implement this: I would choose to do the optimization in
>> __rcu_process_callbacks() create small on-call-stack ptr-array for
>> kfree_bulk(). I would only optimize the case that call kfree()
>> directly. In the while(list) loop I would defer calling
>> __rcu_reclaim() for __is_kfree_rcu_offset(head->func), and instead add
>> them to the ptr-array (and flush if the array is full in loop, and
>> kfree_bulk flush after loop).
>>
>> The real advantage of kfree_bulk() comes from amortizing the per kfree
>> (behind-the-scenes) sync cost. There is an additional benefit, because
>> objects comes from RCU and will hit a slower path in SLUB. The SLUB
>> allocator is very fast for objects that gets recycled quickly (short
>> lifetime), non-locked (cpu-local) double-cmpxchg. But slower for
>> longer-lived/more-outstanding objects, as this hits a slower code-path,
>> fully locked (cross-cpu) double-cmpxchg.
> Something like this ... (compile tested only)
>
> Considerably less code; Rao, what do you think?
>
> diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
> index 59c471de342a..5ac4ed077233 100644
> --- a/kernel/rcu/rcu.h
> +++ b/kernel/rcu/rcu.h
> @@ -174,20 +174,19 @@ static inline void debug_rcu_head_unqueue(struct rcu_head *head)
> }
> #endif /* #else !CONFIG_DEBUG_OBJECTS_RCU_HEAD */
>
> -void kfree(const void *);
> -
> /*
> * Reclaim the specified callback, either by invoking it (non-lazy case)
> * or freeing it directly (lazy case). Return true if lazy, false otherwise.
> */
> -static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head)
> +static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head, void **kfree,
> + unsigned int *idx)
> {
> unsigned long offset = (unsigned long)head->func;
>
> rcu_lock_acquire(&rcu_callback_map);
> if (__is_kfree_rcu_offset(offset)) {
> RCU_TRACE(trace_rcu_invoke_kfree_callback(rn, head, offset);)
> - kfree((void *)head - offset);
> + kfree[*idx++] = (void *)head - offset;
> rcu_lock_release(&rcu_callback_map);
> return true;
> } else {
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index f9c0ca2ccf0c..7e13979b4697 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -2725,6 +2725,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> struct rcu_head *rhp;
> struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
> long bl, count;
> + void *to_free[16];
> + unsigned int to_free_idx = 0;
>
> /* If no callbacks are ready, just return. */
> if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
> @@ -2755,8 +2757,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> rhp = rcu_cblist_dequeue(&rcl);
> for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
> debug_rcu_head_unqueue(rhp);
> - if (__rcu_reclaim(rsp->name, rhp))
> + if (__rcu_reclaim(rsp->name, rhp, to_free, &to_free_idx))
> rcu_cblist_dequeued_lazy(&rcl);
> + if (to_free_idx == 16)
> + kfree_bulk(16, to_free);
> /*
> * Stop only if limit reached and CPU has something to do.
> * Note: The rcl structure counts down from zero.
> @@ -2766,6 +2770,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> (!is_idle_task(current) && !rcu_is_callbacks_kthread())))
> break;
> }
> + if (to_free_idx)
> + kfree_bulk(to_free_idx, to_free);
>
> local_irq_save(flags);
> count = -rcl.len;
> diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
> index db85ca3975f1..4127be06759b 100644
> --- a/kernel/rcu/tree_plugin.h
> +++ b/kernel/rcu/tree_plugin.h
> @@ -2189,6 +2189,8 @@ static int rcu_nocb_kthread(void *arg)
> struct rcu_head *next;
> struct rcu_head **tail;
> struct rcu_data *rdp = arg;
> + void *to_free[16];
> + unsigned int to_free_idx = 0;
>
> /* Each pass through this loop invokes one batch of callbacks */
> for (;;) {
> @@ -2226,13 +2228,18 @@ static int rcu_nocb_kthread(void *arg)
> }
> debug_rcu_head_unqueue(list);
> local_bh_disable();
> - if (__rcu_reclaim(rdp->rsp->name, list))
> + if (__rcu_reclaim(rdp->rsp->name, list, to_free,
> + &to_free_idx))
> cl++;
> c++;
> + if (to_free_idx == 16)
> + kfree_bulk(16, to_free);
> local_bh_enable();
> cond_resched_rcu_qs();
> list = next;
> }
> + if (to_free_idx)
> + kfree_bulk(to_free_idx, to_free);
> trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
> smp_mb__before_atomic(); /* _add after CB invocation. */
> atomic_long_add(-c, &rdp->nocb_q_count);
This is definitely less code and I believe this is what I tried initially.
With this approach we are accumulating for one cycle only and
__call_rcu() is called for each object, though I am not sure if that is
an issue because my change has to hold a lock. If everyone else thinks
this is good enough than we should just go with this, there is no need
to make unnecessary changes.
Please let me know so I do not have to submit a patch :-)
Shoaib.
On Tue, Dec 19, 2017 at 02:12:06PM -0800, Matthew Wilcox wrote:
> On Tue, Dec 19, 2017 at 09:41:58PM +0100, Jesper Dangaard Brouer wrote:
> > If I had to implement this: I would choose to do the optimization in
> > __rcu_process_callbacks() create small on-call-stack ptr-array for
> > kfree_bulk(). I would only optimize the case that call kfree()
> > directly. In the while(list) loop I would defer calling
> > __rcu_reclaim() for __is_kfree_rcu_offset(head->func), and instead add
> > them to the ptr-array (and flush if the array is full in loop, and
> > kfree_bulk flush after loop).
> >
> > The real advantage of kfree_bulk() comes from amortizing the per kfree
> > (behind-the-scenes) sync cost. There is an additional benefit, because
> > objects comes from RCU and will hit a slower path in SLUB. The SLUB
> > allocator is very fast for objects that gets recycled quickly (short
> > lifetime), non-locked (cpu-local) double-cmpxchg. But slower for
> > longer-lived/more-outstanding objects, as this hits a slower code-path,
> > fully locked (cross-cpu) double-cmpxchg.
>
> Something like this ... (compile tested only)
>
> Considerably less code; Rao, what do you think?
I am sorry, but I am not at all fan of this approach.
If we are going to make this sort of change, we should do so in a way
that allows the slab code to actually do the optimizations that might
make this sort of thing worthwhile. After all, if the main goal was small
code size, the best approach is to drop kfree_bulk() and get on with life
in the usual fashion.
I would prefer to believe that something like kfree_bulk() can help,
and if that is the case, we should give it a chance to do things like
group kfree_rcu() requests by destination slab and soforth, allowing
batching optimizations that might provide more significant increases
in performance. Furthermore, having this in slab opens the door to
slab taking emergency action when memory is low.
But for the patch below, NAK.
Thanx, Paul
> diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
> index 59c471de342a..5ac4ed077233 100644
> --- a/kernel/rcu/rcu.h
> +++ b/kernel/rcu/rcu.h
> @@ -174,20 +174,19 @@ static inline void debug_rcu_head_unqueue(struct rcu_head *head)
> }
> #endif /* #else !CONFIG_DEBUG_OBJECTS_RCU_HEAD */
>
> -void kfree(const void *);
> -
> /*
> * Reclaim the specified callback, either by invoking it (non-lazy case)
> * or freeing it directly (lazy case). Return true if lazy, false otherwise.
> */
> -static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head)
> +static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head, void **kfree,
> + unsigned int *idx)
> {
> unsigned long offset = (unsigned long)head->func;
>
> rcu_lock_acquire(&rcu_callback_map);
> if (__is_kfree_rcu_offset(offset)) {
> RCU_TRACE(trace_rcu_invoke_kfree_callback(rn, head, offset);)
> - kfree((void *)head - offset);
> + kfree[*idx++] = (void *)head - offset;
> rcu_lock_release(&rcu_callback_map);
> return true;
> } else {
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index f9c0ca2ccf0c..7e13979b4697 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -2725,6 +2725,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> struct rcu_head *rhp;
> struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
> long bl, count;
> + void *to_free[16];
> + unsigned int to_free_idx = 0;
>
> /* If no callbacks are ready, just return. */
> if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
> @@ -2755,8 +2757,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> rhp = rcu_cblist_dequeue(&rcl);
> for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
> debug_rcu_head_unqueue(rhp);
> - if (__rcu_reclaim(rsp->name, rhp))
> + if (__rcu_reclaim(rsp->name, rhp, to_free, &to_free_idx))
> rcu_cblist_dequeued_lazy(&rcl);
> + if (to_free_idx == 16)
> + kfree_bulk(16, to_free);
> /*
> * Stop only if limit reached and CPU has something to do.
> * Note: The rcl structure counts down from zero.
> @@ -2766,6 +2770,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> (!is_idle_task(current) && !rcu_is_callbacks_kthread())))
> break;
> }
> + if (to_free_idx)
> + kfree_bulk(to_free_idx, to_free);
>
> local_irq_save(flags);
> count = -rcl.len;
> diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
> index db85ca3975f1..4127be06759b 100644
> --- a/kernel/rcu/tree_plugin.h
> +++ b/kernel/rcu/tree_plugin.h
> @@ -2189,6 +2189,8 @@ static int rcu_nocb_kthread(void *arg)
> struct rcu_head *next;
> struct rcu_head **tail;
> struct rcu_data *rdp = arg;
> + void *to_free[16];
> + unsigned int to_free_idx = 0;
>
> /* Each pass through this loop invokes one batch of callbacks */
> for (;;) {
> @@ -2226,13 +2228,18 @@ static int rcu_nocb_kthread(void *arg)
> }
> debug_rcu_head_unqueue(list);
> local_bh_disable();
> - if (__rcu_reclaim(rdp->rsp->name, list))
> + if (__rcu_reclaim(rdp->rsp->name, list, to_free,
> + &to_free_idx))
> cl++;
> c++;
> + if (to_free_idx == 16)
> + kfree_bulk(16, to_free);
> local_bh_enable();
> cond_resched_rcu_qs();
> list = next;
> }
> + if (to_free_idx)
> + kfree_bulk(to_free_idx, to_free);
> trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
> smp_mb__before_atomic(); /* _add after CB invocation. */
> atomic_long_add(-c, &rdp->nocb_q_count);
>
On Tue, 19 Dec 2017, Rao Shoaib wrote:
> > > mm/slab_common.c
> > It would be great to have separate patches so that we can review it
> > properly:
> >
> > 1. Move the code into slab_common.c
> > 2. The actual code changes to the kfree rcu mechanism
> > 3. The whitespace changes
> I can certainly break down the patch and submit smaller patches as you have
> suggested.
>
> BTW -- This is my first ever patch to Linux, so I am still learning the
> etiquette.
You are doing great. Keep at improving the patches and we will get your
changes into the kernel source. If you want to sent your first patchset
then a tool like "quilt" or "git" might be helpful.
On Tue, Dec 19, 2017 at 04:20:51PM -0800, Paul E. McKenney wrote:
> If we are going to make this sort of change, we should do so in a way
> that allows the slab code to actually do the optimizations that might
> make this sort of thing worthwhile. After all, if the main goal was small
> code size, the best approach is to drop kfree_bulk() and get on with life
> in the usual fashion.
>
> I would prefer to believe that something like kfree_bulk() can help,
> and if that is the case, we should give it a chance to do things like
> group kfree_rcu() requests by destination slab and soforth, allowing
> batching optimizations that might provide more significant increases
> in performance. Furthermore, having this in slab opens the door to
> slab taking emergency action when memory is low.
kfree_bulk does sort by destination slab; look at build_detached_freelist.
On Tue, Dec 19, 2017 at 05:53:36PM -0800, Matthew Wilcox wrote:
> On Tue, Dec 19, 2017 at 04:20:51PM -0800, Paul E. McKenney wrote:
> > If we are going to make this sort of change, we should do so in a way
> > that allows the slab code to actually do the optimizations that might
> > make this sort of thing worthwhile. After all, if the main goal was small
> > code size, the best approach is to drop kfree_bulk() and get on with life
> > in the usual fashion.
> >
> > I would prefer to believe that something like kfree_bulk() can help,
> > and if that is the case, we should give it a chance to do things like
> > group kfree_rcu() requests by destination slab and soforth, allowing
> > batching optimizations that might provide more significant increases
> > in performance. Furthermore, having this in slab opens the door to
> > slab taking emergency action when memory is low.
>
> kfree_bulk does sort by destination slab; look at build_detached_freelist.
Understood, but beside the point. I suspect that giving it larger
scope makes it more efficient, similar to disk drives in the old days.
Grouping on the stack when processing RCU callbacks limits what can
reasonably be done. Furthermore, using the vector approach going into the
grace period is much more cache-efficient than the linked-list approach,
given that the blocks have a reasonable chance of going cache-cold during
the grace period.
And the slab-related operations should really be in the slab code in any
case rather than within RCU.
Thanx, Paul
On Tue, 19 Dec 2017 16:20:51 -0800
"Paul E. McKenney" <[email protected]> wrote:
> On Tue, Dec 19, 2017 at 02:12:06PM -0800, Matthew Wilcox wrote:
> > On Tue, Dec 19, 2017 at 09:41:58PM +0100, Jesper Dangaard Brouer wrote:
> > > If I had to implement this: I would choose to do the optimization in
> > > __rcu_process_callbacks() create small on-call-stack ptr-array for
> > > kfree_bulk(). I would only optimize the case that call kfree()
> > > directly. In the while(list) loop I would defer calling
> > > __rcu_reclaim() for __is_kfree_rcu_offset(head->func), and instead add
> > > them to the ptr-array (and flush if the array is full in loop, and
> > > kfree_bulk flush after loop).
> > >
> > > The real advantage of kfree_bulk() comes from amortizing the per kfree
> > > (behind-the-scenes) sync cost. There is an additional benefit, because
> > > objects comes from RCU and will hit a slower path in SLUB. The SLUB
> > > allocator is very fast for objects that gets recycled quickly (short
> > > lifetime), non-locked (cpu-local) double-cmpxchg. But slower for
> > > longer-lived/more-outstanding objects, as this hits a slower code-path,
> > > fully locked (cross-cpu) double-cmpxchg.
> >
> > Something like this ... (compile tested only)
Yes, exactly.
> > Considerably less code; Rao, what do you think?
>
> I am sorry, but I am not at all fan of this approach.
>
> If we are going to make this sort of change, we should do so in a way
> that allows the slab code to actually do the optimizations that might
> make this sort of thing worthwhile. After all, if the main goal was small
> code size, the best approach is to drop kfree_bulk() and get on with life
> in the usual fashion.
>
> I would prefer to believe that something like kfree_bulk() can help,
> and if that is the case, we should give it a chance to do things like
> group kfree_rcu() requests by destination slab and soforth, allowing
> batching optimizations that might provide more significant increases
> in performance. Furthermore, having this in slab opens the door to
> slab taking emergency action when memory is low.
I agree with your argument. Although in the (slub) code I do handle
different destination slab's, but only do a limited look-ahead to find
same dest-slab's which gives the speedup (see build_detached_freelist).
We do have a larger and more consistent speedup potential, if adding
infrastructure that allow us to pre-sort by destination slab, before
invoking kfree_bulk(). In that respect, Rao's patch is a better
approach.
> But for the patch below, NAK.
>
> Thanx, Paul
>
> > diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
> > index 59c471de342a..5ac4ed077233 100644
> > --- a/kernel/rcu/rcu.h
> > +++ b/kernel/rcu/rcu.h
> > @@ -174,20 +174,19 @@ static inline void debug_rcu_head_unqueue(struct rcu_head *head)
> > }
> > #endif /* #else !CONFIG_DEBUG_OBJECTS_RCU_HEAD */
> >
> > -void kfree(const void *);
> > -
> > /*
> > * Reclaim the specified callback, either by invoking it (non-lazy case)
> > * or freeing it directly (lazy case). Return true if lazy, false otherwise.
> > */
> > -static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head)
> > +static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head, void **kfree,
> > + unsigned int *idx)
> > {
> > unsigned long offset = (unsigned long)head->func;
> >
> > rcu_lock_acquire(&rcu_callback_map);
> > if (__is_kfree_rcu_offset(offset)) {
> > RCU_TRACE(trace_rcu_invoke_kfree_callback(rn, head, offset);)
> > - kfree((void *)head - offset);
> > + kfree[*idx++] = (void *)head - offset;
> > rcu_lock_release(&rcu_callback_map);
> > return true;
> > } else {
> > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > index f9c0ca2ccf0c..7e13979b4697 100644
> > --- a/kernel/rcu/tree.c
> > +++ b/kernel/rcu/tree.c
> > @@ -2725,6 +2725,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> > struct rcu_head *rhp;
> > struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
> > long bl, count;
> > + void *to_free[16];
> > + unsigned int to_free_idx = 0;
> >
> > /* If no callbacks are ready, just return. */
> > if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
> > @@ -2755,8 +2757,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> > rhp = rcu_cblist_dequeue(&rcl);
> > for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
> > debug_rcu_head_unqueue(rhp);
> > - if (__rcu_reclaim(rsp->name, rhp))
> > + if (__rcu_reclaim(rsp->name, rhp, to_free, &to_free_idx))
> > rcu_cblist_dequeued_lazy(&rcl);
> > + if (to_free_idx == 16)
> > + kfree_bulk(16, to_free);
> > /*
> > * Stop only if limit reached and CPU has something to do.
> > * Note: The rcl structure counts down from zero.
> > @@ -2766,6 +2770,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> > (!is_idle_task(current) && !rcu_is_callbacks_kthread())))
> > break;
> > }
> > + if (to_free_idx)
> > + kfree_bulk(to_free_idx, to_free);
> >
> > local_irq_save(flags);
> > count = -rcl.len;
> > diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
> > index db85ca3975f1..4127be06759b 100644
> > --- a/kernel/rcu/tree_plugin.h
> > +++ b/kernel/rcu/tree_plugin.h
> > @@ -2189,6 +2189,8 @@ static int rcu_nocb_kthread(void *arg)
> > struct rcu_head *next;
> > struct rcu_head **tail;
> > struct rcu_data *rdp = arg;
> > + void *to_free[16];
> > + unsigned int to_free_idx = 0;
> >
> > /* Each pass through this loop invokes one batch of callbacks */
> > for (;;) {
> > @@ -2226,13 +2228,18 @@ static int rcu_nocb_kthread(void *arg)
> > }
> > debug_rcu_head_unqueue(list);
> > local_bh_disable();
> > - if (__rcu_reclaim(rdp->rsp->name, list))
> > + if (__rcu_reclaim(rdp->rsp->name, list, to_free,
> > + &to_free_idx))
> > cl++;
> > c++;
> > + if (to_free_idx == 16)
> > + kfree_bulk(16, to_free);
> > local_bh_enable();
> > cond_resched_rcu_qs();
> > list = next;
> > }
> > + if (to_free_idx)
> > + kfree_bulk(to_free_idx, to_free);
> > trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
> > smp_mb__before_atomic(); /* _add after CB invocation. */
> > atomic_long_add(-c, &rdp->nocb_q_count);
> >
>
--
Best regards,
Jesper Dangaard Brouer
MSc.CS, Principal Kernel Engineer at Red Hat
LinkedIn: http://www.linkedin.com/in/brouer
On Tue, 19 Dec 2017 13:20:43 -0800 Rao Shoaib <[email protected]> wrote:
> On 12/19/2017 12:41 PM, Jesper Dangaard Brouer wrote:
> > On Tue, 19 Dec 2017 09:52:27 -0800 [email protected] wrote:
> >
> >> +/* Main RCU function that is called to free RCU structures */
> >> +static void
> >> +__rcu_bulk_free(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
> >> +{
> >> + unsigned long offset;
> >> + void *ptr;
> >> + struct rcu_bulk_free *rbf;
> >> + struct rcu_bulk_free_container *rbfc = NULL;
> >> +
> >> + rbf = this_cpu_ptr(&cpu_rbf);
> >> +
> >> + if (unlikely(!rbf->rbf_init)) {
> >> + spin_lock_init(&rbf->rbf_lock);
> >> + rbf->rbf_cpu = smp_processor_id();
> >> + rbf->rbf_init = true;
> >> + }
> >> +
> >> + /* hold lock to protect against other cpu's */
> >> + spin_lock_bh(&rbf->rbf_lock);
> >
> > I'm not sure this will be faster. Having to take a cross CPU lock here
> > (+ BH-disable) could cause scaling issues. Hopefully this lock will
> > not be used intensively by other CPUs, right?
> >
[...]
>
> As Paul has pointed out the lock is a per-cpu lock, the only reason for
> another CPU to access this lock is if the rcu callbacks run on a
> different CPU and there is nothing the code can do to avoid that but
> that should be rare anyways.
(loop in Paul's comment)
On Tue, 19 Dec 2017 12:56:29 -0800
"Paul E. McKenney" <[email protected]> wrote:
> Isn't this lock in a per-CPU object? It -might- go cross-CPU in response
> to CPU-hotplug operations, but that should be rare.
Point taken. If this lock is very unlikely to be taken on another CPU
then I withdraw my performance concerns (the cacheline can hopefully
stay in Modified(M) state on this CPU, and all other CPUs will have in
in Invalid(I) state based on MESI cache coherence protocol view[1]).
The lock's atomic operation does have some overhead, and _later_ if we
could get fancy and use seqlock (include/linux/seqlock.h) to remove
that.
[1] https://en.wikipedia.org/wiki/MESI_protocol
--
Best regards,
Jesper Dangaard Brouer
MSc.CS, Principal Kernel Engineer at Red Hat
LinkedIn: http://www.linkedin.com/in/brouer
On Tue 19-12-17 12:02:03, Rao Shoaib wrote:
> BTW -- This is my first ever patch to Linux, so I am still learning the
> etiquette.
Reading through Documentation/process/submitting-patches.rst might be
really helpful.
Good luck!
--
Michal Hocko
SUSE Labs
On Tue, 19 Dec 2017 18:56:51 -0600 (CST)
Christopher Lameter <[email protected]> wrote:
> On Tue, 19 Dec 2017, Rao Shoaib wrote:
>
> > > > mm/slab_common.c
> > > It would be great to have separate patches so that we can review it
> > > properly:
> > >
> > > 1. Move the code into slab_common.c
> > > 2. The actual code changes to the kfree rcu mechanism
> > > 3. The whitespace changes
>
> > I can certainly break down the patch and submit smaller patches as you have
> > suggested.
> >
> > BTW -- This is my first ever patch to Linux, so I am still learning the
> > etiquette.
>
> You are doing great. Keep at improving the patches and we will get your
> changes into the kernel source. If you want to sent your first patchset
> then a tool like "quilt" or "git" might be helpful.
When working with patchsets (multiple separate patches, as requested
here), I personally prefer using the tool called Stacked Git[1] (StGit)
command line 'stg', as it allows me to easily adjust patches in the
middle of the patchset "stack". It is similar to quilt, just git based
itself.
I guess most people on this list use 'git rebase --interactive' when
updating their patchsets (?)
[1] http://procode.org/stgit/doc/tutorial.html
--
Best regards,
Jesper Dangaard Brouer
MSc.CS, Principal Kernel Engineer at Red Hat
LinkedIn: http://www.linkedin.com/in/brouer