2024-05-20 15:15:18

by Wen Yang

[permalink] [raw]
Subject: [PATCH v2] eventfd: introduce ratelimited wakeup for non-semaphore eventfd

For the NON-SEMAPHORE eventfd, a write (2) call adds the 8-byte integer
value provided in its buffer to the counter, while a read (2) returns the
8-byte value containing the value and resetting the counter value to 0.
Therefore, the accumulated value of multiple writes can be retrieved by a
single read.

However, the current situation is to immediately wake up the read thread
after writing the NON-SEMAPHORE eventfd, which increases unnecessary CPU
overhead. By introducing a configurable rate limiting mechanism in
eventfd_write, these unnecessary wake-up operations are reduced.

We may use the following test code:
#define _GNU_SOURCE
#include <assert.h>
#include <err.h>
#include <errno.h>
#include <getopt.h>
#include <pthread.h>
#include <poll.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/eventfd.h>
#include <sys/prctl.h>
#include <sys/ioctl.h>

struct eventfd_qos {
__u32 token_capacity;
__u32 token_rate;
};

#define EFD_IOC_SET_QOS _IOW('E', 0, struct eventfd_qos)
#define EFD_IOC_GET_QOS _IOR('E', 0, struct eventfd_qos)

struct pub_param {
int fd;
int cpu;
struct eventfd_qos *qos;
};

struct sub_param {
int fd;
int cpu;
};

static void publish(void *data)
{
struct pub_param * param = (struct pub_param *)data;
unsigned long long value = 1;
cpu_set_t cpuset;
int ret;

prctl(PR_SET_NAME,"publish");

CPU_ZERO(&cpuset);
CPU_SET(param->cpu, &cpuset);
sched_setaffinity(0, sizeof(cpuset), &cpuset);

if (param->qos) {
ret = ioctl(param->fd, EFD_IOC_SET_QOS, param->qos);
if (ret == -1) {
printf("ioctl failed, error=%s\n",
strerror(errno));
return;
}
}

while (1) {
ret = eventfd_write(param->fd, value);
if (ret < 0)
printf("XXX: write failed, %s\n",
strerror(errno));
}
}

static void subscribe(void *data)
{
struct sub_param *param = (struct sub_param *)data;
unsigned long long value = 0;
struct pollfd pfds[1];
cpu_set_t cpuset;

prctl(PR_SET_NAME,"subscribe");
CPU_ZERO(&cpuset);
CPU_SET(param->cpu, &cpuset);
sched_setaffinity(0, sizeof(cpuset), &cpuset);

pfds[0].fd = param->fd;
pfds[0].events = POLLIN;

while(1) {
poll(pfds, 1, -1);
if(pfds[0].revents & POLLIN) {
read(param->fd, &value, sizeof(value));
}
}
}

static void usage(void)
{
printf("Usage: \n");
printf("\t");
printf("<-p cpuid> <-s cpuid > [ -r rate ] [ -c capacity ] \n");
}

int main(int argc, char *argv[])
{
char *optstr = "p:s:r::c::";
struct sub_param sub_param = {0};
struct pub_param pub_param = {0};
struct eventfd_qos qos = {0};
pid_t pid;
int fd;
int opt;

if (argc < 3) {
usage();
return 1;
}

while((opt = getopt(argc, argv, optstr)) != -1){
switch(opt) {
case 'p':
pub_param.cpu = atoi(optarg);
break;
case 's':
sub_param.cpu = atoi(optarg);
break;
case 'r':
qos.token_rate = atoi(optarg);
break;
case 'c':
qos.token_capacity = atoi(optarg);
break;
case '?':
usage();
return 1;
}
}

fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_NONBLOCK);
assert(fd);

sub_param.fd = fd;
pub_param.fd = fd;
pub_param.qos = (qos.token_capacity && qos.token_rate) ? &qos : NULL;

pid = fork();
if (pid == 0)
subscribe(&sub_param);
else if (pid > 0)
publish(&pub_param);
else {
printf("XXX: fork error!\n");
return -1;
}

return 0;
}

# ./a.out -p 2 -s 3
The original cpu usage is as follows:
09:53:38 PM CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle
09:53:40 PM 2 47.26 0.00 52.74 0.00 0.00 0.00 0.00 0.00 0.00 0.00
09:53:40 PM 3 44.72 0.00 55.28 0.00 0.00 0.00 0.00 0.00 0.00 0.00

09:53:40 PM CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle
09:53:42 PM 2 45.73 0.00 54.27 0.00 0.00 0.00 0.00 0.00 0.00 0.00
09:53:42 PM 3 46.00 0.00 54.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00

09:53:42 PM CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle
09:53:44 PM 2 48.00 0.00 52.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
09:53:44 PM 3 45.50 0.00 54.50 0.00 0.00 0.00 0.00 0.00 0.00 0.00

Then enable the ratelimited wakeup, eg:
# ./a.out -p 2 -s 3 -r1000 -c2

Observing a decrease of over 20% in CPU utilization (CPU # 3, 54% ->30%), as shown below:
10:02:32 PM CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle
10:02:34 PM 2 53.00 0.00 47.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
10:02:34 PM 3 30.81 0.00 30.81 0.00 0.00 0.00 0.00 0.00 0.00 38.38

10:02:34 PM CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle
10:02:36 PM 2 48.50 0.00 51.50 0.00 0.00 0.00 0.00 0.00 0.00 0.00
10:02:36 PM 3 30.20 0.00 30.69 0.00 0.00 0.00 0.00 0.00 0.00 39.11

10:02:36 PM CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle
10:02:38 PM 2 45.00 0.00 55.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
10:02:38 PM 3 27.08 0.00 30.21 0.00 0.00 0.00 0.00 0.00 0.00 42.71

Signed-off-by: Wen Yang <[email protected]>
Cc: Al Viro <[email protected]>
Cc: Jens Axboe <[email protected]>
Cc: Christian Brauner <[email protected]>
Cc: Christoph Hellwig <[email protected]>
Cc: Jan Kara <[email protected]>
Cc: Dylan Yudaken <[email protected]>
Cc: David Woodhouse <[email protected]>
Cc: Paolo Bonzini <[email protected]>
Cc: Dave Young <[email protected]>
Cc: kernel test robot <[email protected]>
Cc: [email protected]
Cc: [email protected]
---
v2: fix the build errors reported by kernel test robot

fs/eventfd.c | 188 ++++++++++++++++++++++++++++++++++-
include/uapi/linux/eventfd.h | 8 ++
init/Kconfig | 18 ++++
3 files changed, 213 insertions(+), 1 deletion(-)

diff --git a/fs/eventfd.c b/fs/eventfd.c
index 9afdb722fa92..a6161ba73f94 100644
--- a/fs/eventfd.c
+++ b/fs/eventfd.c
@@ -27,6 +27,15 @@

static DEFINE_IDA(eventfd_ida);

+#ifdef CONFIG_EVENTFD_RATELIMITED_WAKEUP
+struct eventfd_bucket {
+ struct eventfd_qos qos;
+ struct hrtimer timer;
+ u64 timestamp;
+ u64 tokens;
+};
+#endif
+
struct eventfd_ctx {
struct kref kref;
wait_queue_head_t wqh;
@@ -41,8 +50,97 @@ struct eventfd_ctx {
__u64 count;
unsigned int flags;
int id;
+#ifdef CONFIG_EVENTFD_RATELIMITED_WAKEUP
+ struct eventfd_bucket bucket;
+#endif
};

+#ifdef CONFIG_EVENTFD_RATELIMITED_WAKEUP
+
+static void eventfd_refill_tokens(struct eventfd_bucket *bucket)
+{
+ unsigned int rate = bucket->qos.token_rate;
+ u64 now = ktime_get_ns();
+ u64 tokens;
+
+ tokens = ktime_sub(now, bucket->timestamp) * rate;
+ do_div(tokens, NSEC_PER_SEC);
+ if (tokens > 0) {
+ tokens += bucket->tokens;
+ bucket->tokens = (tokens > bucket->qos.token_capacity) ?
+ tokens : bucket->qos.token_capacity;
+ }
+ bucket->timestamp = now;
+}
+
+static int eventfd_consume_tokens(struct eventfd_bucket *bucket)
+{
+ if (bucket->tokens > 0) {
+ bucket->tokens--;
+ return 1;
+ } else
+ return 0;
+}
+
+static bool eventfd_detect_storm(struct eventfd_ctx *ctx)
+{
+ u32 rate = ctx->bucket.qos.token_rate;
+
+ if (rate == 0)
+ return false;
+
+ eventfd_refill_tokens(&ctx->bucket);
+ return !eventfd_consume_tokens(&ctx->bucket);
+}
+
+static enum hrtimer_restart eventfd_timer_handler(struct hrtimer *timer)
+{
+ struct eventfd_ctx *ctx;
+ unsigned long flags;
+
+ ctx = container_of(timer, struct eventfd_ctx, bucket.timer);
+ spin_lock_irqsave(&ctx->wqh.lock, flags);
+
+ /*
+ * Checking for locked entry and wake_up_locked_poll() happens
+ * under the ctx->wqh.lock lock spinlock
+ */
+ if (waitqueue_active(&ctx->wqh))
+ wake_up_locked_poll(&ctx->wqh, EPOLLIN);
+
+ spin_unlock_irqrestore(&ctx->wqh.lock, flags);
+ eventfd_ctx_put(ctx);
+
+ return HRTIMER_NORESTART;
+}
+
+static void eventfd_ratelimited_wake_up(struct eventfd_ctx *ctx)
+{
+ u32 rate = ctx->bucket.qos.token_rate;
+ u64 now = ktime_get_ns();
+ u64 slack_ns;
+ u64 expires;
+
+ if (likely(rate)) {
+ slack_ns = NSEC_PER_SEC/rate;
+ } else {
+ WARN_ON_ONCE("fallback to the default NSEC_PER_SEC.");
+ slack_ns = NSEC_PER_MSEC;
+ }
+
+ /* if already queued, don't bother */
+ if (hrtimer_is_queued(&ctx->bucket.timer))
+ return;
+
+ /* determine next wakeup, add a timer margin */
+ expires = now + slack_ns;
+
+ kref_get(&ctx->kref);
+ hrtimer_start(&ctx->bucket.timer, expires, HRTIMER_MODE_ABS);
+}
+
+#endif
+
/**
* eventfd_signal_mask - Increment the event counter
* @ctx: [in] Pointer to the eventfd context.
@@ -270,8 +368,23 @@ static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t c
if (likely(res > 0)) {
ctx->count += ucnt;
current->in_eventfd = 1;
- if (waitqueue_active(&ctx->wqh))
+
+ /*
+ * Checking for locked entry and wake_up_locked_poll() happens
+ * under the ctx->wqh.lock spinlock
+ */
+ if (waitqueue_active(&ctx->wqh)) {
+#ifdef CONFIG_EVENTFD_RATELIMITED_WAKEUP
+ if ((ctx->flags & EFD_SEMAPHORE) || !eventfd_detect_storm(ctx))
+ wake_up_locked_poll(&ctx->wqh, EPOLLIN);
+ else
+ eventfd_ratelimited_wake_up(ctx);
+
+#else
wake_up_locked_poll(&ctx->wqh, EPOLLIN);
+#endif
+ }
+
current->in_eventfd = 0;
}
spin_unlock_irq(&ctx->wqh.lock);
@@ -299,6 +412,66 @@ static void eventfd_show_fdinfo(struct seq_file *m, struct file *f)
}
#endif

+#ifdef CONFIG_EVENTFD_RATELIMITED_WAKEUP
+static long eventfd_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
+{
+ struct eventfd_ctx *ctx = file->private_data;
+ void __user *uaddr = (void __user *)arg;
+ struct eventfd_qos qos;
+
+ if (ctx->flags & EFD_SEMAPHORE)
+ return -EINVAL;
+ if (!uaddr)
+ return -EINVAL;
+
+ switch (cmd) {
+ case EFD_IOC_SET_QOS:
+ if (copy_from_user(&qos, uaddr, sizeof(qos)))
+ return -EFAULT;
+ if (qos.token_rate > NSEC_PER_SEC)
+ return -EINVAL;
+
+ for (;;) {
+ spin_lock_irq(&ctx->wqh.lock);
+ if (hrtimer_try_to_cancel(&ctx->bucket.timer) >= 0) {
+ spin_unlock_irq(&ctx->wqh.lock);
+ break;
+ }
+ spin_unlock_irq(&ctx->wqh.lock);
+ hrtimer_cancel_wait_running(&ctx->bucket.timer);
+ }
+
+ spin_lock_irq(&ctx->wqh.lock);
+ ctx->bucket.timestamp = ktime_get_ns();
+ ctx->bucket.qos = qos;
+ ctx->bucket.tokens = qos.token_capacity;
+
+ current->in_eventfd = 1;
+ /*
+ * Checking for locked entry and wake_up_locked_poll() happens
+ * under the ctx->wqh.lock lock spinlock
+ */
+ if ((!ctx->count) && (waitqueue_active(&ctx->wqh)))
+ wake_up_locked_poll(&ctx->wqh, EPOLLIN);
+ current->in_eventfd = 0;
+
+ spin_unlock_irq(&ctx->wqh.lock);
+ return 0;
+
+ case EFD_IOC_GET_QOS:
+ qos = READ_ONCE(ctx->bucket.qos);
+ if (copy_to_user(uaddr, &qos, sizeof(qos)))
+ return -EFAULT;
+ return 0;
+
+ default:
+ return -ENOENT;
+ }
+
+ return -EINVAL;
+}
+#endif
+
static const struct file_operations eventfd_fops = {
#ifdef CONFIG_PROC_FS
.show_fdinfo = eventfd_show_fdinfo,
@@ -308,6 +481,10 @@ static const struct file_operations eventfd_fops = {
.read_iter = eventfd_read,
.write = eventfd_write,
.llseek = noop_llseek,
+#ifdef CONFIG_EVENTFD_RATELIMITED_WAKEUP
+ .unlocked_ioctl = eventfd_ioctl,
+ .compat_ioctl = eventfd_ioctl,
+#endif
};

/**
@@ -403,6 +580,15 @@ static int do_eventfd(unsigned int count, int flags)
ctx->flags = flags;
ctx->id = ida_alloc(&eventfd_ida, GFP_KERNEL);

+#ifdef CONFIG_EVENTFD_RATELIMITED_WAKEUP
+ ctx->bucket.qos.token_rate = 0;
+ ctx->bucket.qos.token_capacity = 0;
+ ctx->bucket.tokens = 0;
+ ctx->bucket.timestamp = ktime_get_ns();
+ hrtimer_init(&ctx->bucket.timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
+ ctx->bucket.timer.function = eventfd_timer_handler;
+#endif
+
flags &= EFD_SHARED_FCNTL_FLAGS;
flags |= O_RDWR;
fd = get_unused_fd_flags(flags);
diff --git a/include/uapi/linux/eventfd.h b/include/uapi/linux/eventfd.h
index 2eb9ab6c32f3..8e9d5361ec6a 100644
--- a/include/uapi/linux/eventfd.h
+++ b/include/uapi/linux/eventfd.h
@@ -8,4 +8,12 @@
#define EFD_CLOEXEC O_CLOEXEC
#define EFD_NONBLOCK O_NONBLOCK

+struct eventfd_qos {
+ __u32 token_capacity;
+ __u32 token_rate;
+};
+
+#define EFD_IOC_SET_QOS _IOW('E', 0, struct eventfd_qos)
+#define EFD_IOC_GET_QOS _IOR('E', 0, struct eventfd_qos)
+
#endif /* _UAPI_LINUX_EVENTFD_H */
diff --git a/init/Kconfig b/init/Kconfig
index 0a021d6b4939..ebfc79ff34ca 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -1646,6 +1646,24 @@ config EVENTFD

If unsure, say Y.

+config EVENTFD_RATELIMITED_WAKEUP
+ bool "support ratelimited wakeups for the NON-SEMAPHORE eventfd" if EXPERT
+ default n
+ depends on EVENTFD
+ help
+ This option enables the ratelimited wakeups for the non-semaphore
+ eventfd. Frequent writing to an eventfd can lead to frequent wakeup
+ of processes waiting for reading on this eventfd, resulting in
+ significant overhead. However, for the NON-SEMAPHORE eventfd, if its
+ counter has a non-zero value, read (2) returns 8 bytes containing
+ that value, and the counter value is reset to zero. This means that
+ a read operation can retrieve the accumulated value caused by
+ multiple write operations.
+ By introducing the ratelimited wakeups for the NON-SEMAPHORE eventfd,
+ these CPU overhead can be reduced.
+
+ If unsure, say N.
+
config SHMEM
bool "Use full shmem filesystem" if EXPERT
default y
--
2.25.1