2012-05-01 19:08:40

by Doug Ledford

[permalink] [raw]
Subject: [Patch 0/4] ipc/mqueue improvements

While working on the POSIX message queue subsystem a while back I
noticed some things that could stand to be improved (as it turns
out, drastically so). So I improved them. The first patch fixes
the POSIX message queue subsystem to not suck under heavy load and
high queue depths. The next two are minor touchups/fixes. The
fourth is in the tools/selftests directory and is the app I used
to do the performance testing of the mqueue subsystem.

Considering that the changes I've made here make as much as a
1000 fold difference in performance, it would be nice to see them
go into linux-next ;-)

Special note: these patches require and are built on top of my
previous patches. Applying these to a tree that does not also
have the previous series of 7 patches I sent related to the
mqueue subsystem will result in a broken tree that won't compile.


--

Doug Ledford <[email protected]>


2012-05-01 19:08:31

by Doug Ledford

[permalink] [raw]
Subject: [Patch 2/4] ipc/mqueue: correct mq_attr_ok test

While working on the other parts of the mqueue stuff, I noticed that
the calculation for overflow in mq_attr_ok didn't actually match
reality (this is especially true since my last patch which changed
how we account memory slightly).

In particular, we used to test for overflow using:
msgs * msgsize + msgs * sizeof(struct msg_msg *)

That was never really correct because each message we allocate via
load_msg() is actually a struct msg_msg followed by the data for
the message (and if struct msg_msg + data exceeds PAGE_SIZE we end
up allocating struct msg_msgseg structs too, but accounting for them
would get really tedious, so let's ignore those...they're only a
pointer in size anyway). This patch updates the calculation to be
more accurate in regards to maximum possible memory consumption by the
mqueue.

Signed-off-by: Doug Ledford <[email protected]>
---
ipc/mqueue.c | 9 +++++++--
1 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index fd49cfc..4b2892e 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -670,6 +670,8 @@ static void remove_notification(struct mqueue_inode_info *info)

static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr)
{
+ int mq_treesize;
+
if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
return 0;
if (capable(CAP_SYS_RESOURCE)) {
@@ -684,8 +686,11 @@ static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr)
/* check for overflow */
if (attr->mq_msgsize > ULONG_MAX/attr->mq_maxmsg)
return 0;
- if ((unsigned long)(attr->mq_maxmsg * (attr->mq_msgsize
- + sizeof (struct msg_msg *))) <
+ mq_treesize = attr->mq_maxmsg * sizeof(struct msg_msg) +
+ min_t(unsigned int, attr->mq_maxmsg, MQ_PRIO_MAX) *
+ sizeof(struct posix_msg_tree_node);
+ if ((unsigned long)(attr->mq_maxmsg * attr->mq_msgsize +
+ mq_treesize) <
(unsigned long)(attr->mq_maxmsg * attr->mq_msgsize))
return 0;
return 1;
--
1.7.7.6

2012-05-01 19:08:29

by Doug Ledford

[permalink] [raw]
Subject: [Patch 3/4] ipc/mqueue: strengthen checks on mqueue creation

We already check the mq attr struct if it's passed in, but now that the
admin can set system wide defaults separate from maximums, it's actually
possible to set the defaults to something that would overflow. So,
if there is no attr struct passed in to the open call, check the default
values.

While we are at it, simplify mq_attr_ok() by making it return 0 or an
error condition, so that way if we add more tests to it later, we have
the option of what error should be returned instead of the calling
location having to pick a possibly inaccurate error code.

Signed-off-by: Doug Ledford <[email protected]>
---
ipc/mqueue.c | 27 ++++++++++++++++++---------
1 files changed, 18 insertions(+), 9 deletions(-)

diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 4b2892e..6089f73 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -673,27 +673,27 @@ static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr)
int mq_treesize;

if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
- return 0;
+ return -EINVAL;
if (capable(CAP_SYS_RESOURCE)) {
if (attr->mq_maxmsg > HARD_MSGMAX ||
attr->mq_msgsize > HARD_MSGSIZEMAX)
- return 0;
+ return -EINVAL;
} else {
if (attr->mq_maxmsg > ipc_ns->mq_msg_max ||
attr->mq_msgsize > ipc_ns->mq_msgsize_max)
- return 0;
+ return -EINVAL;
}
/* check for overflow */
if (attr->mq_msgsize > ULONG_MAX/attr->mq_maxmsg)
- return 0;
+ return -ENOMEM;
mq_treesize = attr->mq_maxmsg * sizeof(struct msg_msg) +
min_t(unsigned int, attr->mq_maxmsg, MQ_PRIO_MAX) *
sizeof(struct posix_msg_tree_node);
if ((unsigned long)(attr->mq_maxmsg * attr->mq_msgsize +
mq_treesize) <
(unsigned long)(attr->mq_maxmsg * attr->mq_msgsize))
- return 0;
- return 1;
+ return -ENOMEM;
+ return 0;
}

/*
@@ -708,12 +708,21 @@ static struct file *do_create(struct ipc_namespace *ipc_ns, struct dentry *dir,
int ret;

if (attr) {
- if (!mq_attr_ok(ipc_ns, attr)) {
- ret = -EINVAL;
+ ret = mq_attr_ok(ipc_ns, attr);
+ if (ret)
goto out;
- }
/* store for use during create */
dentry->d_fsdata = attr;
+ } else {
+ struct mq_attr def_attr;
+
+ def_attr.mq_maxmsg = min(ipc_ns->mq_msg_max,
+ ipc_ns->mq_msg_default);
+ def_attr.mq_msgsize = min(ipc_ns->mq_msgsize_max,
+ ipc_ns->mq_msgsize_default);
+ ret = mq_attr_ok(ipc_ns, &def_attr);
+ if (ret)
+ goto out;
}

mode &= ~current_umask();
--
1.7.7.6

2012-05-01 19:09:13

by Doug Ledford

[permalink] [raw]
Subject: [Patch 4/4] tools/selftests: add mq_perf_tests

Add the mq_perf_tests tool I used when creating my mq performance patch.
Also add a local .gitignore to keep the binaries from showing up in
git status output.

Signed-off-by: Doug Ledford <[email protected]>
---
tools/testing/selftests/mqueue/.gitignore | 2 +
tools/testing/selftests/mqueue/Makefile | 4 +-
tools/testing/selftests/mqueue/mq_perf_tests.c | 734 ++++++++++++++++++++++++
3 files changed, 739 insertions(+), 1 deletions(-)
create mode 100644 tools/testing/selftests/mqueue/.gitignore
create mode 100644 tools/testing/selftests/mqueue/mq_perf_tests.c

diff --git a/tools/testing/selftests/mqueue/.gitignore b/tools/testing/selftests/mqueue/.gitignore
new file mode 100644
index 0000000..d8d4237
--- /dev/null
+++ b/tools/testing/selftests/mqueue/.gitignore
@@ -0,0 +1,2 @@
+mq_open_tests
+mq_perf_tests
diff --git a/tools/testing/selftests/mqueue/Makefile b/tools/testing/selftests/mqueue/Makefile
index bd74142..54c0aad 100644
--- a/tools/testing/selftests/mqueue/Makefile
+++ b/tools/testing/selftests/mqueue/Makefile
@@ -1,8 +1,10 @@
all:
gcc -O2 -lrt mq_open_tests.c -o mq_open_tests
+ gcc -O2 -lrt -lpthread -lpopt -o mq_perf_tests mq_perf_tests.c

run_tests:
./mq_open_tests /test1
+ ./mq_perf_tests

clean:
- rm -f mq_open_tests
+ rm -f mq_open_tests mq_perf_tests
diff --git a/tools/testing/selftests/mqueue/mq_perf_tests.c b/tools/testing/selftests/mqueue/mq_perf_tests.c
new file mode 100644
index 0000000..0a59c10
--- /dev/null
+++ b/tools/testing/selftests/mqueue/mq_perf_tests.c
@@ -0,0 +1,734 @@
+/*
+ * This application is Copyright 2012 Red Hat, Inc.
+ * Doug Ledford <[email protected]>
+ *
+ * mq_perf_tests is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3.
+ *
+ * mq_perf_tests is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * For the full text of the license, see <http://www.gnu.org/licenses/>.
+ *
+ * mq_perf_tests.c
+ * Tests various types of message queue workloads, concentrating on those
+ * situations that invole large message sizes, large message queue depths,
+ * or both, and reports back useful metrics about kernel message queue
+ * performance.
+ *
+ */
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+#include <limits.h>
+#include <errno.h>
+#include <signal.h>
+#include <pthread.h>
+#include <sched.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/resource.h>
+#include <sys/stat.h>
+#include <mqueue.h>
+#include <popt.h>
+
+static char *usage =
+"Usage:\n"
+" %s [-c #[,#..] -f] path\n"
+"\n"
+" -c # Skip most tests and go straight to a high queue depth test\n"
+" and then run that test continuously (useful for running at\n"
+" the same time as some other workload to see how much the\n"
+" cache thrashing caused by adding messages to a very deep\n"
+" queue impacts the performance of other programs). The number\n"
+" indicates which CPU core we should bind the process to during\n"
+" the run. If you have more than one physical CPU, then you\n"
+" will need one copy per physical CPU package, and you should\n"
+" specify the CPU cores to pin ourself to via a comma separated\n"
+" list of CPU values.\n"
+" -f Only usable with continuous mode. Pin ourself to the CPUs\n"
+" as requested, then instead of looping doing a high mq\n"
+" workload, just busy loop. This will allow us to lock up a\n"
+" single CPU just like we normally would, but without actually\n"
+" thrashing the CPU cache. This is to make it easier to get\n"
+" comparable numbers from some other workload running on the\n"
+" other CPUs. One set of numbers with # CPUs locked up running\n"
+" an mq workload, and another set of numbers with those same\n"
+" CPUs locked away from the test workload, but not doing\n"
+" anything to trash the cache like the mq workload might.\n"
+" path Path name of the message queue to create\n"
+"\n"
+" Note: this program must be run as root in order to enable all tests\n"
+"\n";
+
+char *MAX_MSGS = "/proc/sys/fs/mqueue/msg_max";
+char *MAX_MSGSIZE = "/proc/sys/fs/mqueue/msgsize_max";
+
+#define min(a,b) ((a) < (b) ? (a) : (b))
+#define MAX_CPUS 64
+char *cpu_option_string;
+int cpus_to_pin[MAX_CPUS];
+int num_cpus_to_pin;
+pthread_t cpu_threads[MAX_CPUS];
+pthread_t main_thread;
+cpu_set_t *cpu_set;
+int cpu_set_size;
+int cpus_online;
+
+#define MSG_SIZE 16
+#define TEST1_LOOPS 10000000
+#define TEST2_LOOPS 100000
+int continuous_mode;
+int continuous_mode_fake;
+
+struct rlimit saved_limits, cur_limits;
+int saved_max_msgs, saved_max_msgsize;
+int cur_max_msgs, cur_max_msgsize;
+FILE *max_msgs, *max_msgsize;
+int cur_nice;
+char *queue_path = "/mq_perf_tests";
+mqd_t queue = -1;
+struct mq_attr result;
+int mq_prio_max;
+
+const struct poptOption options[] =
+{
+ {
+ .longName = "continuous",
+ .shortName = 'c',
+ .argInfo = POPT_ARG_STRING,
+ .arg = &cpu_option_string,
+ .val = 'c',
+ .descrip = "Run continuous tests at a high queue depth in "
+ "order to test the effects of cache thrashing on "
+ "other tasks on the system. This test is intended "
+ "to be run on one core of each physical CPU while "
+ "some other CPU intensive task is run on all the other "
+ "cores of that same physical CPU and the other task "
+ "is timed. It is assumed that the process of adding "
+ "messages to the message queue in a tight loop will "
+ "impact that other task to some degree. Once the "
+ "tests are performed in this way, you should then "
+ "re-run the tests using fake mode in order to check "
+ "the difference in time required to perform the CPU "
+ "intensive task",
+ .argDescrip = "cpu[,cpu]",
+ },
+ {
+ .longName = "fake",
+ .shortName = 'f',
+ .argInfo = POPT_ARG_NONE,
+ .arg = &continuous_mode_fake,
+ .val = 0,
+ .descrip = "Tie up the CPUs that we would normally tie up in"
+ "continuous mode, but don't actually do any mq stuff, "
+ "just keep the CPU busy so it can't be used to process "
+ "system level tasks as this would free up resources on "
+ "the other CPU cores and skew the comparison between "
+ "the no-mqueue work and mqueue work tests",
+ .argDescrip = NULL,
+ },
+ {
+ .longName = "path",
+ .shortName = 'p',
+ .argInfo = POPT_ARG_STRING | POPT_ARGFLAG_SHOW_DEFAULT,
+ .arg = &queue_path,
+ .val = 'p',
+ .descrip = "The name of the path to use in the mqueue "
+ "filesystem for our tests",
+ .argDescrip = "pathname",
+ },
+ POPT_AUTOHELP
+ POPT_TABLEEND
+};
+
+static inline void __set(FILE *stream, int value, char *err_msg);
+void shutdown(int exit_val, char *err_cause, int line_no);
+void sig_action_SIGUSR1(int signum, siginfo_t *info, void *context);
+void sig_action(int signum, siginfo_t *info, void *context);
+static inline int get(FILE *stream);
+static inline void set(FILE *stream, int value);
+static inline int try_set(FILE *stream, int value);
+static inline void getr(int type, struct rlimit *rlim);
+static inline void setr(int type, struct rlimit *rlim);
+static inline void open_queue(struct mq_attr *attr);
+void increase_limits(void);
+
+static inline void __set(FILE *stream, int value, char *err_msg)
+{
+ rewind(stream);
+ if (fprintf(stream, "%d", value) < 0)
+ perror(err_msg);
+}
+
+
+void shutdown(int exit_val, char *err_cause, int line_no)
+{
+ static int in_shutdown = 0;
+ int errno_at_shutdown = errno;
+ int i;
+
+ /* In case we get called by multiple threads or from an sighandler */
+ if (in_shutdown++)
+ return;
+
+ for (i=0; i < num_cpus_to_pin; i++)
+ if (cpu_threads[i]) {
+ pthread_kill(cpu_threads[i], SIGUSR1);
+ pthread_join(cpu_threads[i], NULL);
+ }
+
+ if (queue != -1)
+ if (mq_close(queue))
+ perror("mq_close() during shutdown");
+ if (queue_path)
+ /*
+ * Be silent if this fails, if we cleaned up already it's
+ * expected to fail
+ */
+ mq_unlink(queue_path);
+ if (saved_max_msgs)
+ __set(max_msgs, saved_max_msgs,
+ "failed to restore saved_max_msgs");
+ if (saved_max_msgsize)
+ __set(max_msgsize, saved_max_msgsize,
+ "failed to restore saved_max_msgsize");
+ if (exit_val)
+ error(exit_val, errno_at_shutdown, "%s at %d",
+ err_cause, line_no);
+ exit(0);
+}
+
+void sig_action_SIGUSR1(int signum, siginfo_t *info, void *context)
+{
+ if (pthread_self() != main_thread)
+ pthread_exit(0);
+ else {
+ fprintf(stderr, "Caught signal %d in SIGUSR1 handler, "
+ "exiting\n", signum);
+ shutdown(0, "", 0);
+ fprintf(stderr, "\n\nReturned from shutdown?!?!\n\n");
+ exit(0);
+ }
+}
+
+void sig_action(int signum, siginfo_t *info, void *context)
+{
+ if (pthread_self() != main_thread)
+ pthread_kill(main_thread, signum);
+ else {
+ fprintf(stderr, "Caught signal %d, exiting\n", signum);
+ shutdown(0, "", 0);
+ fprintf(stderr, "\n\nReturned from shutdown?!?!\n\n");
+ exit(0);
+ }
+}
+
+static inline int get(FILE *stream)
+{
+ int value;
+ rewind(stream);
+ if (fscanf(stream, "%d", &value) != 1)
+ shutdown(4, "Error reading /proc entry", __LINE__);
+ return value;
+}
+
+static inline void set(FILE *stream, int value)
+{
+ int new_value;
+
+ rewind(stream);
+ if (fprintf(stream, "%d", value) < 0)
+ return shutdown(5, "Failed writing to /proc file", __LINE__);
+ new_value = get(stream);
+ if (new_value != value)
+ return shutdown(5, "We didn't get what we wrote to /proc back",
+ __LINE__);
+}
+
+static inline int try_set(FILE *stream, int value)
+{
+ int new_value;
+
+ rewind(stream);
+ fprintf(stream, "%d", value);
+ new_value = get(stream);
+ return new_value == value;
+}
+
+static inline void getr(int type, struct rlimit *rlim)
+{
+ if (getrlimit(type, rlim))
+ shutdown(6, "getrlimit()", __LINE__);
+}
+
+static inline void setr(int type, struct rlimit *rlim)
+{
+ if (setrlimit(type, rlim))
+ shutdown(7, "setrlimit()", __LINE__);
+}
+
+/**
+ * open_queue - open the global queue for testing
+ * @attr - An attr struct specifying the desired queue traits
+ * @result - An attr struct that lists the actual traits the queue has
+ *
+ * This open is not allowed to fail, failure will result in an orderly
+ * shutdown of the program. The global queue_path is used to set what
+ * queue to open, the queue descriptor is saved in the global queue
+ * variable.
+ */
+static inline void open_queue(struct mq_attr *attr)
+{
+ int flags = O_RDWR | O_EXCL | O_CREAT | O_NONBLOCK;
+ int perms = DEFFILEMODE;
+
+ if ((queue = mq_open(queue_path, flags, perms, attr)) == -1)
+ shutdown(1, "mq_open()", __LINE__);
+ if (mq_getattr(queue, &result))
+ shutdown(1, "mq_getattr()", __LINE__);
+ printf("\n\tQueue %s created:\n", queue_path);
+ printf("\t\tmq_flags:\t\t\t%s\n", result.mq_flags & O_NONBLOCK ?
+ "O_NONBLOCK" : "(null)");
+ printf("\t\tmq_maxmsg:\t\t\t%d\n", result.mq_maxmsg);
+ printf("\t\tmq_msgsize:\t\t\t%d\n", result.mq_msgsize);
+ printf("\t\tmq_curmsgs:\t\t\t%d\n", result.mq_curmsgs);
+}
+
+void *fake_cont_thread(void *arg)
+{
+ int i;
+
+ for (i=0; i < num_cpus_to_pin; i++)
+ if (cpu_threads[i] == pthread_self())
+ break;
+ printf("\tStarted fake continuous mode thread %d on CPU %d\n", i,
+ cpus_to_pin[i]);
+ while(1) ;
+}
+
+void *cont_thread(void *arg)
+{
+ char buff[MSG_SIZE];
+ int i, priority;
+
+ for (i=0; i < num_cpus_to_pin; i++)
+ if (cpu_threads[i] == pthread_self())
+ break;
+ printf("\tStarted continuous mode thread %d on CPU %d\n", i,
+ cpus_to_pin[i]);
+ while(1) {
+ while(mq_send(queue, buff, sizeof(buff), 0) == 0);
+ mq_receive(queue, buff, sizeof(buff), &priority);
+ }
+}
+
+#define drain_queue() \
+ while (mq_receive(queue, buff, MSG_SIZE, &prio_in) == MSG_SIZE)
+
+#define do_untimed_send() \
+ do { \
+ if (mq_send(queue, buff, MSG_SIZE, prio_out)) \
+ shutdown(3, "Test send failure", __LINE__); \
+ } while(0)
+
+#define do_send_recv() \
+ do { \
+ clock_gettime(clock, &start); \
+ if (mq_send(queue, buff, MSG_SIZE, prio_out)) \
+ shutdown(3, "Test send failure", __LINE__); \
+ clock_gettime(clock, &middle); \
+ if (mq_receive(queue, buff, MSG_SIZE, &prio_in) != MSG_SIZE) \
+ shutdown(3, "Test receive failure", __LINE__); \
+ clock_gettime(clock, &end); \
+ nsec = ((middle.tv_sec - start.tv_sec) * 1000000000) + \
+ (middle.tv_nsec - start.tv_nsec); \
+ send_total.tv_nsec += nsec; \
+ if (send_total.tv_nsec >= 1000000000) { \
+ send_total.tv_sec++; \
+ send_total.tv_nsec -= 1000000000; \
+ } \
+ nsec = ((end.tv_sec - middle.tv_sec) * 1000000000) + \
+ (end.tv_nsec - middle.tv_nsec); \
+ recv_total.tv_nsec += nsec; \
+ if (recv_total.tv_nsec >= 1000000000) { \
+ recv_total.tv_sec++; \
+ recv_total.tv_nsec -= 1000000000; \
+ } \
+ } while (0)
+
+struct test {
+ char *desc;
+ void (*func)(int *);
+};
+
+void const_prio(int *prio)
+{
+ return;
+}
+
+void inc_prio(int *prio)
+{
+ if (++*prio == mq_prio_max)
+ *prio = 0;;
+}
+
+void dec_prio(int *prio)
+{
+ if (--*prio < 0)
+ *prio = mq_prio_max - 1;
+}
+
+void random_prio(int *prio)
+{
+ *prio = random() % mq_prio_max;
+}
+
+struct test test2[] = {
+ {"\n\tTest #2a: Time send/recv message, queue full, constant prio\n",
+ const_prio},
+ {"\n\tTest #2b: Time send/recv message, queue full, increasing prio\n",
+ inc_prio},
+ {"\n\tTest #2c: Time send/recv message, queue full, decreasing prio\n",
+ dec_prio},
+ {"\n\tTest #2d: Time send/recv message, queue full, random prio\n",
+ random_prio},
+ {NULL, NULL}
+};
+
+/**
+ * Tests to perform (all done with MSG_SIZE messages):
+ *
+ * 1) Time to add/remove message with 0 messages on queue
+ * 1a) with constant prio
+ * 2) Time to add/remove message when queue close to capacity:
+ * 2a) with constant prio
+ * 2b) with increasing prio
+ * 2c) with decreasing prio
+ * 2d) with random prio
+ * 3) Test limits of priorities honored (double check _SC_MQ_PRIO_MAX)
+ */
+void *perf_test_thread(void *arg)
+{
+ char buff[MSG_SIZE];
+ int prio_out, prio_in;
+ int i;
+ clockid_t clock;
+ pthread_t *t;
+ struct timespec res, start, middle, end, send_total, recv_total;
+ unsigned long long nsec;
+ struct test *cur_test;
+
+ t = &cpu_threads[0];
+ printf("\n\tStarted mqueue performance test thread on CPU %d\n",
+ cpus_to_pin[0]);
+ if ((mq_prio_max = sysconf(_SC_MQ_PRIO_MAX)) == -1)
+ shutdown(2, "sysconf(_SC_MQ_PRIO_MAX)", __LINE__);
+ if (pthread_getcpuclockid(cpu_threads[0], &clock) != 0)
+ shutdown(2, "pthread_getcpuclockid", __LINE__);
+
+ if (clock_getres(clock, &res))
+ shutdown(2, "clock_getres()", __LINE__);
+
+ printf("\t\tMax priorities:\t\t\t%d\n", mq_prio_max);
+ printf("\t\tClock resolution:\t\t%d nsec%s\n", res.tv_nsec,
+ res.tv_nsec > 1 ? "s" : "");
+
+
+
+ printf("\n\tTest #1: Time send/recv message, queue "
+ "empty\n");
+ printf("\t\t(%d iterations)\n", TEST1_LOOPS);
+ prio_out = 0;
+ send_total.tv_sec = 0;
+ send_total.tv_nsec = 0;
+ recv_total.tv_sec = 0;
+ recv_total.tv_nsec = 0;
+ for (i = 0; i < TEST1_LOOPS; i++)
+ do_send_recv();
+ printf("\t\tSend msg:\t\t\t%d.%ds total time\n",
+ send_total.tv_sec, send_total.tv_nsec);
+ nsec = ((unsigned long long)send_total.tv_sec * 1000000000 +
+ send_total.tv_nsec) / TEST1_LOOPS;
+ printf("\t\t\t\t\t\t%d nsec/msg\n", nsec);
+ printf("\t\tRecv msg:\t\t\t%d.%ds total time\n",
+ recv_total.tv_sec, recv_total.tv_nsec);
+ nsec = ((unsigned long long)recv_total.tv_sec * 1000000000 +
+ recv_total.tv_nsec) / TEST1_LOOPS;
+ printf("\t\t\t\t\t\t%d nsec/msg\n", nsec);
+
+
+ for (cur_test = test2; cur_test->desc != NULL; cur_test++) {
+ printf(cur_test->desc);
+ printf("\t\t(%d iterations)\n", TEST2_LOOPS);
+ prio_out = 0;
+ send_total.tv_sec = 0;
+ send_total.tv_nsec = 0;
+ recv_total.tv_sec = 0;
+ recv_total.tv_nsec = 0;
+ printf("\t\tFilling queue...");
+ fflush(stdout);
+ clock_gettime(clock, &start);
+ for (i = 0; i < result.mq_maxmsg - 1; i++) {
+ do_untimed_send();
+ cur_test->func(&prio_out);
+ }
+ clock_gettime(clock, &end);
+ nsec = ((unsigned long long)(end.tv_sec - start.tv_sec) *
+ 1000000000) + (end.tv_nsec - start.tv_nsec);
+ printf("done.\t\t%lld.%llds\n", nsec / 1000000000,
+ nsec % 1000000000);
+ printf("\t\tTesting...");
+ fflush(stdout);
+ for (i = 0; i < TEST2_LOOPS; i++) {
+ do_send_recv();
+ cur_test->func(&prio_out);
+ }
+ printf("done.\n");
+ printf("\t\tSend msg:\t\t\t%d.%ds total time\n",
+ send_total.tv_sec, send_total.tv_nsec);
+ nsec = ((unsigned long long)send_total.tv_sec * 1000000000 +
+ send_total.tv_nsec) / TEST2_LOOPS;
+ printf("\t\t\t\t\t\t%d nsec/msg\n", nsec);
+ printf("\t\tRecv msg:\t\t\t%d.%ds total time\n",
+ recv_total.tv_sec, recv_total.tv_nsec);
+ nsec = ((unsigned long long)recv_total.tv_sec * 1000000000 +
+ recv_total.tv_nsec) / TEST2_LOOPS;
+ printf("\t\t\t\t\t\t%d nsec/msg\n", nsec);
+ printf("\t\tDraining queue...");
+ fflush(stdout);
+ clock_gettime(clock, &start);
+ drain_queue();
+ clock_gettime(clock, &end);
+ nsec = ((unsigned long long)(end.tv_sec - start.tv_sec) *
+ 1000000000) + (end.tv_nsec - start.tv_nsec);
+ printf("done.\t\t%lld.%llds\n", nsec / 1000000000,
+ nsec % 1000000000);
+ }
+ return 0;
+}
+
+void increase_limits(void)
+{
+ cur_limits.rlim_cur = RLIM_INFINITY;
+ cur_limits.rlim_max = RLIM_INFINITY;
+ setr(RLIMIT_MSGQUEUE, &cur_limits);
+ while (try_set(max_msgs, cur_max_msgs += 10));
+ cur_max_msgs = get(max_msgs);
+ while (try_set(max_msgsize, cur_max_msgsize += 1024));
+ cur_max_msgsize = get(max_msgsize);
+ if (setpriority(PRIO_PROCESS, 0, -20) != 0)
+ shutdown(2, "setpriority()", __LINE__);
+ cur_nice = -20;
+}
+
+int main(int argc, char *argv[])
+{
+ struct mq_attr attr;
+ char *option, *next_option;
+ int i, cpu;
+ struct sigaction sa;
+ poptContext popt_context;
+ char rc;
+ void *retval;
+
+ main_thread = pthread_self();
+ num_cpus_to_pin = 0;
+
+ if (sysconf(_SC_NPROCESSORS_ONLN) == -1) {
+ perror("sysconf(_SC_NPROCESSORS_ONLN)");
+ exit(1);
+ }
+ cpus_online = min(MAX_CPUS, sysconf(_SC_NPROCESSORS_ONLN));
+ if ((cpu_set = CPU_ALLOC(cpus_online)) == NULL) {
+ perror("CPU_ALLOC()");
+ exit(1);
+ }
+ cpu_set_size = CPU_ALLOC_SIZE(cpus_online);
+ CPU_ZERO_S(cpu_set_size, cpu_set);
+
+ popt_context = poptGetContext(NULL, argc, (const char **)argv,
+ options, 0);
+
+ while ((rc = poptGetNextOpt(popt_context)) > 0) {
+ switch (rc) {
+ case 'c':
+ continuous_mode = 1;
+ option = cpu_option_string;
+ do {
+ next_option = strchr(option, ',');
+ if (next_option)
+ *next_option = '\0';
+ cpu = atoi(option);
+ if (cpu >= cpus_online)
+ fprintf(stderr, "CPU %d exceeds "
+ "cpus online, ignoring.\n",
+ cpu);
+ else
+ cpus_to_pin[num_cpus_to_pin++] = cpu;
+ if (next_option)
+ option = ++next_option;
+ } while (next_option && num_cpus_to_pin < MAX_CPUS);
+ /* Double check that they didn't give us the same CPU
+ * more than once */
+ for (cpu = 0; cpu < num_cpus_to_pin; cpu++) {
+ if (CPU_ISSET_S(cpus_to_pin[cpu], cpu_set_size,
+ cpu_set)) {
+ fprintf(stderr, "Any given CPU may "
+ "only be given once.\n");
+ exit(1);
+ } else
+ CPU_SET_S(cpus_to_pin[cpu],
+ cpu_set_size, cpu_set);
+ }
+ break;
+ case 'p':
+ /*
+ * Although we can create a msg queue with a
+ * non-absolute path name, unlink will fail. So,
+ * if the name doesn't start with a /, add one
+ * when we save it.
+ */
+ option = queue_path;
+ if (*option != '/') {
+ queue_path = malloc(strlen(option) + 2);
+ if (!queue_path) {
+ perror("malloc()");
+ exit(1);
+ }
+ queue_path[0] = '/';
+ queue_path[1] = 0;
+ strcat(queue_path, option);
+ free(option);
+ }
+ break;
+ }
+ }
+
+ if (continuous_mode && num_cpus_to_pin == 0) {
+ fprintf(stderr, "Must pass at least one CPU to continuous "
+ "mode.\n");
+ poptPrintUsage(popt_context, stderr, 0);
+ exit(1);
+ } else if (!continuous_mode) {
+ num_cpus_to_pin = 1;
+ cpus_to_pin[0] = cpus_online - 1;
+ }
+
+ if (getuid() != 0) {
+ fprintf(stderr, "Not running as root, but almost all tests "
+ "require root in order to modify\nsystem settings. "
+ "Exiting.\n");
+ exit(1);
+ }
+
+ max_msgs = fopen(MAX_MSGS, "r+");
+ max_msgsize = fopen(MAX_MSGSIZE, "r+");
+ if (!max_msgs)
+ shutdown(2, "Failed to open msg_max", __LINE__);
+ if (!max_msgsize)
+ shutdown(2, "Failed to open msgsize_max", __LINE__);
+
+ /* Load up the current system values for everything we can */
+ getr(RLIMIT_MSGQUEUE, &saved_limits);
+ cur_limits = saved_limits;
+ saved_max_msgs = cur_max_msgs = get(max_msgs);
+ saved_max_msgsize = cur_max_msgsize = get(max_msgsize);
+ errno = 0;
+ cur_nice = getpriority(PRIO_PROCESS, 0);
+ if (errno)
+ shutdown(2, "getpriority()", __LINE__);
+
+ /* Tell the user our initial state */
+ printf("\nInitial system state:\n");
+ printf("\tUsing queue path:\t\t\t%s\n", queue_path);
+ printf("\tRLIMIT_MSGQUEUE(soft):\t\t\t%d\n", saved_limits.rlim_cur);
+ printf("\tRLIMIT_MSGQUEUE(hard):\t\t\t%d\n", saved_limits.rlim_max);
+ printf("\tMaximum Message Size:\t\t\t%d\n", saved_max_msgsize);
+ printf("\tMaximum Queue Size:\t\t\t%d\n", saved_max_msgs);
+ printf("\tNice value:\t\t\t\t%d\n", cur_nice);
+ printf("\n");
+
+ increase_limits();
+
+ printf("Adjusted system state for testing:\n");
+ if (cur_limits.rlim_cur == RLIM_INFINITY) {
+ printf("\tRLIMIT_MSGQUEUE(soft):\t\t\t(unlimited)\n");
+ printf("\tRLIMIT_MSGQUEUE(hard):\t\t\t(unlimited)\n");
+ } else {
+ printf("\tRLIMIT_MSGQUEUE(soft):\t\t\t%d\n",
+ cur_limits.rlim_cur);
+ printf("\tRLIMIT_MSGQUEUE(hard):\t\t\t%d\n",
+ cur_limits.rlim_max);
+ }
+ printf("\tMaximum Message Size:\t\t\t%d\n", cur_max_msgsize);
+ printf("\tMaximum Queue Size:\t\t\t%d\n", cur_max_msgs);
+ printf("\tNice value:\t\t\t\t%d\n", cur_nice);
+ printf("\tContinuous mode:\t\t\t(%s)\n", continuous_mode ?
+ (continuous_mode_fake ? "fake mode" : "enabled") :
+ "disabled");
+ printf("\tCPUs to pin:\t\t\t\t%d", cpus_to_pin[0]);
+ for(cpu=1; cpu < num_cpus_to_pin; cpu++)
+ printf(",%d", cpus_to_pin[cpu]);
+ printf("\n");
+
+ sa.sa_sigaction = sig_action_SIGUSR1;
+ sigemptyset(&sa.sa_mask);
+ sigaddset(&sa.sa_mask, SIGHUP);
+ sigaddset(&sa.sa_mask, SIGINT);
+ sigaddset(&sa.sa_mask, SIGQUIT);
+ sigaddset(&sa.sa_mask, SIGTERM);
+ sa.sa_flags = SA_SIGINFO;
+ if (sigaction(SIGUSR1, &sa, NULL) == -1)
+ shutdown(1, "sigaction(SIGUSR1)", __LINE__);
+ sa.sa_sigaction = sig_action;
+ if (sigaction(SIGHUP, &sa, NULL) == -1)
+ shutdown(1, "sigaction(SIGHUP)", __LINE__);
+ if (sigaction(SIGINT, &sa, NULL) == -1)
+ shutdown(1, "sigaction(SIGINT)", __LINE__);
+ if (sigaction(SIGQUIT, &sa, NULL) == -1)
+ shutdown(1, "sigaction(SIGQUIT)", __LINE__);
+ if (sigaction(SIGTERM, &sa, NULL) == -1)
+ shutdown(1, "sigaction(SIGTERM)", __LINE__);
+
+ if (!continuous_mode_fake) {
+ attr.mq_flags = O_NONBLOCK;
+ attr.mq_maxmsg = cur_max_msgs;
+ attr.mq_msgsize = MSG_SIZE;
+ open_queue(&attr);
+ }
+ for (i=0; i < num_cpus_to_pin; i++) {
+ pthread_attr_t thread_attr;
+ void *thread_func;
+
+ if (continuous_mode_fake)
+ thread_func = &fake_cont_thread;
+ else if (continuous_mode)
+ thread_func = &cont_thread;
+ else
+ thread_func = &perf_test_thread;
+
+ CPU_ZERO_S(cpu_set_size, cpu_set);
+ CPU_SET_S(cpus_to_pin[i], cpu_set_size, cpu_set);
+ pthread_attr_init(&thread_attr);
+ pthread_attr_setaffinity_np(&thread_attr, cpu_set_size,
+ cpu_set);
+ if (pthread_create(&cpu_threads[i], &thread_attr, thread_func,
+ NULL))
+ shutdown(1, "pthread_create()", __LINE__);
+ pthread_attr_destroy(&thread_attr);
+ }
+
+ if (!continuous_mode) {
+ pthread_join(cpu_threads[0], &retval);
+ shutdown((long)retval, "perf_test_thread()", __LINE__);
+ } else
+ while(1) sleep(1);
+ shutdown(0,"",0);
+}
--
1.7.7.6

2012-05-01 19:09:45

by Doug Ledford

[permalink] [raw]
Subject: [Patch 1/4] ipc/mqueue: improve performance of send/recv

The existing implementation of the POSIX message queue send and recv
functions is, well, abysmal. Even worse than abysmal. I submitted
a patch to increase the maximum POSIX message queue limit to 65536
due to customer needs, however, upon looking over the send/recv
implementation, I realized that my customer needs help with that too
even if they don't know it. The basic problem is that, given the
fairly typical use case scenario for a large queue of queueing lots
of messages all at the same priority (I verified with my customer that
this is indeed what their app does), the msg_insert routine is basically
a frikkin' bubble sort. I mean, whoa, that's *so* middle school.

OK, OK, to not slam the original author too much, I'm sure they didn't
envision a queue depth of 50,000+ messages. No one would think that
moving elements in an array, one at a time, and dereferencing each pointer
in that array to check priority of the message being pointed too, again
one at a time, for 50,000+ times would be good. So let's assume that, as
is typical, the users have found a way to break our code simply by using
it in a way we didn't envision. Fair enough.

"So, just how broken is it?", you ask. I wondered the same thing, so I
wrote an app to let me know. It's my next patch. It gave me some
interesting results. Here's what it tested:

Interference with other apps - In continuous mode, the app just sits there
and hits a message queue forever, while you go do something productive on
another terminal using other CPUs. You then measure how long it takes you
to do that something productive. Then you restart the app in fake
continuous mode, and it sits in a tight loop on a CPU while you repeat your
tests. The whole point of this is to keep one CPU tied up (so it can't be
used in your other work) but in one case tied up hitting the mqueue code
so we can see the effect of walking that 65,528 element array one pointer
at a time on the global CPU cache. If it's bad, then it will slow down
your app on the other CPUs just by polluting cache mercilessly. In the
fake case, it will be in a tight loop, but not polluting cache.
Testing the mqueue subsystem directly - Here we just run a number of tests
to see how the mqueue subsystem performs under different conditions. A
couple conditions are known to be worst case for the old system, and some
different conditions were expected to be worst case for the new mqueue
routines, so this tests all of them.

So, on to the results already:


Subsystem/Test Old New

Time to compile linux
kernel (make -j12 on a
6 core CPU)
Running mqueue test user 49m10.744s user 45m26.294s
sys 5m51.924s sys 4m59.894s
total 55m02.668s total 50m26.188s

Running fake test user 45m32.686s user 45m18.552s
sys 5m12.465s sys 4m56.468s
total 50m45.151s total 50m15.020s

% slowdown from mqueue
cache thrashing ~8% ~.5%

Avg time to send/recv (in nanoseconds per message)
when queue empty 305/288 349/318
when queue full (65528 messages)
constant priority 526589/823 362/314
increasing priority 403105/916 495/445
decreasing priority 73420/594 482/409
random priority 280147/920 546/436

Time to fill/drain queue (65528 messages, in seconds)
constant priority 17.37/.12 .13/.12
increasing priority 4.14/.14 .21/.18
decreasing priority 12.93/.13 .21/.18
random priority 8.88/.16 .22/.17

So, I think the results speak for themselves. It's possible this
implementation could be improved by cacheing at least one priority
level in the node tree (that would bring the queue empty performance
more in line with the old implementation), but this works and is *so*
much better than what we had, especially for the common case of a
single priority in use, that further refinements can be in follow on
patches.

Signed-off-by: Doug Ledford <[email protected]>
---
ipc/mqueue.c | 173 +++++++++++++++++++++++++++++++++++++++++++--------------
1 files changed, 130 insertions(+), 43 deletions(-)

diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 04cc77e..fd49cfc 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -49,6 +49,12 @@
#define STATE_PENDING 1
#define STATE_READY 2

+struct posix_msg_tree_node {
+ struct rb_node rb_node;
+ struct list_head msg_list;
+ int priority;
+};
+
struct ext_wait_queue { /* queue of sleeping tasks */
struct task_struct *task;
struct list_head list;
@@ -61,7 +67,7 @@ struct mqueue_inode_info {
struct inode vfs_inode;
wait_queue_head_t wait_q;

- struct msg_msg **messages;
+ struct rb_root msg_tree;
struct mq_attr attr;

struct sigevent notify;
@@ -108,6 +114,90 @@ static struct ipc_namespace *get_ns_from_inode(struct inode *inode)
return ns;
}

+/* Auxiliary functions to manipulate messages' list */
+static int msg_insert(struct msg_msg *msg, struct mqueue_inode_info *info)
+{
+ struct rb_node **p, *parent = NULL;
+ struct posix_msg_tree_node *leaf;
+
+ p = &info->msg_tree.rb_node;
+ while (*p) {
+ parent = *p;
+ leaf = rb_entry(parent, struct posix_msg_tree_node, rb_node);
+
+ if (likely(leaf->priority == msg->m_type))
+ goto insert_msg;
+ else if (msg->m_type < leaf->priority)
+ p = &(*p)->rb_left;
+ else
+ p = &(*p)->rb_right;
+ };
+ leaf = kzalloc(sizeof(struct posix_msg_tree_node), GFP_KERNEL);
+ if (!leaf)
+ return -ENOMEM;
+ rb_init_node(&leaf->rb_node);
+ INIT_LIST_HEAD(&leaf->msg_list);
+ leaf->priority = msg->m_type;
+ rb_link_node(&leaf->rb_node, parent, p);
+ rb_insert_color(&leaf->rb_node, &info->msg_tree);
+ info->qsize += sizeof(struct posix_msg_tree_node);
+insert_msg:
+ info->attr.mq_curmsgs++;
+ info->qsize += msg->m_ts;
+ list_add_tail(&msg->m_list, &leaf->msg_list);
+ return 0;
+}
+
+static inline struct msg_msg *msg_get(struct mqueue_inode_info *info)
+{
+ struct rb_node **p, *parent = NULL;
+ struct posix_msg_tree_node *leaf;
+ struct msg_msg *msg;
+
+try_again:
+ p = &info->msg_tree.rb_node;
+ while (*p) {
+ parent = *p;
+ /*
+ * During insert, low priorities fo to the left and high to
+ * the right. On receive, we want the highest priorities
+ * first, so walk all the way to the right.
+ */
+ p = &(*p)->rb_right;
+ }
+ if (!parent) {
+ if (info->attr.mq_curmsgs) {
+ pr_warn_once("Inconsistency in POSIX message queue, "
+ "no tree element, but supposedly messages "
+ "should exist!\n");
+ info->attr.mq_curmsgs = 0;
+ }
+ return NULL;
+ }
+ leaf = rb_entry(parent, struct posix_msg_tree_node, rb_node);
+ if (list_empty(&leaf->msg_list)) {
+ pr_warn_once("Inconsistency in POSIX message queue, "
+ "empty leaf node but we haven't implemented "
+ "lazy leaf delete!\n");
+ rb_erase(&leaf->rb_node, &info->msg_tree);
+ info->qsize -= sizeof(struct posix_msg_tree_node);
+ kfree(leaf);
+ goto try_again;
+ } else {
+ msg = list_first_entry(&leaf->msg_list,
+ struct msg_msg, m_list);
+ list_del(&msg->m_list);
+ if (list_empty(&leaf->msg_list)) {
+ rb_erase(&leaf->rb_node, &info->msg_tree);
+ info->qsize -= sizeof(struct posix_msg_tree_node);
+ kfree(leaf);
+ }
+ }
+ info->attr.mq_curmsgs--;
+ info->qsize -= msg->m_ts;
+ return msg;
+}
+
static struct inode *mqueue_get_inode(struct super_block *sb,
struct ipc_namespace *ipc_ns, umode_t mode,
struct mq_attr *attr)
@@ -128,7 +218,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb,

if (S_ISREG(mode)) {
struct mqueue_inode_info *info;
- unsigned long mq_bytes, mq_msg_tblsz;
+ unsigned long mq_bytes, mq_treesize;

inode->i_fop = &mqueue_file_operations;
inode->i_size = FILENT_SIZE;
@@ -141,6 +231,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb,
info->notify_owner = NULL;
info->qsize = 0;
info->user = NULL; /* set when all is ok */
+ info->msg_tree = RB_ROOT;
memset(&info->attr, 0, sizeof(info->attr));
info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max,
ipc_ns->mq_msg_default);
@@ -150,16 +241,25 @@ static struct inode *mqueue_get_inode(struct super_block *sb,
info->attr.mq_maxmsg = attr->mq_maxmsg;
info->attr.mq_msgsize = attr->mq_msgsize;
}
- mq_msg_tblsz = info->attr.mq_maxmsg * sizeof(struct msg_msg *);
- if (mq_msg_tblsz > PAGE_SIZE)
- info->messages = vmalloc(mq_msg_tblsz);
- else
- info->messages = kmalloc(mq_msg_tblsz, GFP_KERNEL);
- if (!info->messages)
- goto out_inode;
+ /*
+ * We used to allocate a static array of pointers and account
+ * the size of that array as well as one msg_msg struct per
+ * possible message into the queue size. That's no longer
+ * accurate as the queue is now an rbtree and will grow and
+ * shrink depending on usage patterns. We can, however, still
+ * account one msg_msg struct per message, but the nodes are
+ * allocated depending on priority usage, and most programs
+ * only use one, or a handful, of priorities. However, since
+ * this is pinned memory, we need to assume worst case, so
+ * that means the min(mq_maxmsg, max_priorities) * struct
+ * posix_msg_tree_node.
+ */
+ mq_treesize = info->attr.mq_maxmsg * sizeof(struct msg_msg) +
+ min_t(unsigned int, info->attr.mq_maxmsg, MQ_PRIO_MAX) *
+ sizeof(struct posix_msg_tree_node);

- mq_bytes = (mq_msg_tblsz +
- (info->attr.mq_maxmsg * info->attr.mq_msgsize));
+ mq_bytes = mq_treesize + (info->attr.mq_maxmsg *
+ info->attr.mq_msgsize);

spin_lock(&mq_lock);
if (u->mq_bytes + mq_bytes < u->mq_bytes ||
@@ -250,9 +350,9 @@ static void mqueue_evict_inode(struct inode *inode)
{
struct mqueue_inode_info *info;
struct user_struct *user;
- unsigned long mq_bytes;
- int i;
+ unsigned long mq_bytes, mq_treesize;
struct ipc_namespace *ipc_ns;
+ struct msg_msg *msg;

end_writeback(inode);

@@ -262,17 +362,18 @@ static void mqueue_evict_inode(struct inode *inode)
ipc_ns = get_ns_from_inode(inode);
info = MQUEUE_I(inode);
spin_lock(&info->lock);
- for (i = 0; i < info->attr.mq_curmsgs; i++)
- free_msg(info->messages[i]);
- if (is_vmalloc_addr(info->messages))
- vfree(info->messages);
- else
- kfree(info->messages);
+ while ((msg = msg_get(info)) != NULL)
+ free_msg(msg);
spin_unlock(&info->lock);

/* Total amount of bytes accounted for the mqueue */
- mq_bytes = info->attr.mq_maxmsg * (sizeof(struct msg_msg *)
- + info->attr.mq_msgsize);
+ mq_treesize = info->attr.mq_maxmsg * sizeof(struct msg_msg) +
+ min_t(unsigned int, info->attr.mq_maxmsg, MQ_PRIO_MAX) *
+ sizeof(struct posix_msg_tree_node);
+
+ mq_bytes = mq_treesize + (info->attr.mq_maxmsg *
+ info->attr.mq_msgsize);
+
user = info->user;
if (user) {
spin_lock(&mq_lock);
@@ -492,26 +593,6 @@ static struct ext_wait_queue *wq_get_first_waiter(
return list_entry(ptr, struct ext_wait_queue, list);
}

-/* Auxiliary functions to manipulate messages' list */
-static void msg_insert(struct msg_msg *ptr, struct mqueue_inode_info *info)
-{
- int k;
-
- k = info->attr.mq_curmsgs - 1;
- while (k >= 0 && info->messages[k]->m_type >= ptr->m_type) {
- info->messages[k + 1] = info->messages[k];
- k--;
- }
- info->attr.mq_curmsgs++;
- info->qsize += ptr->m_ts;
- info->messages[k + 1] = ptr;
-}
-
-static inline struct msg_msg *msg_get(struct mqueue_inode_info *info)
-{
- info->qsize -= info->messages[--info->attr.mq_curmsgs]->m_ts;
- return info->messages[info->attr.mq_curmsgs];
-}

static inline void set_cookie(struct sk_buff *skb, char code)
{
@@ -842,7 +923,8 @@ static inline void pipelined_receive(struct mqueue_inode_info *info)
wake_up_interruptible(&info->wait_q);
return;
}
- msg_insert(sender->msg, info);
+ if (msg_insert(sender->msg, info))
+ return;
list_del(&sender->list);
sender->state = STATE_PENDING;
wake_up_process(sender->task);
@@ -930,7 +1012,12 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
pipelined_send(info, msg_ptr, receiver);
} else {
/* adds message to the queue */
- msg_insert(msg_ptr, info);
+ if (msg_insert(msg_ptr, info)) {
+ free_msg(msg_ptr);
+ ret = -ENOMEM;
+ spin_unlock(&info->lock);
+ goto out_fput;
+ }
__do_notify(info);
}
inode->i_atime = inode->i_mtime = inode->i_ctime =
--
1.7.7.6

2012-05-01 19:34:33

by Andrew Morton

[permalink] [raw]
Subject: Re: [Patch 2/4] ipc/mqueue: correct mq_attr_ok test

On Tue, 1 May 2012 13:50:53 -0400
Doug Ledford <[email protected]> wrote:

> While working on the other parts of the mqueue stuff, I noticed that
> the calculation for overflow in mq_attr_ok didn't actually match
> reality (this is especially true since my last patch which changed
> how we account memory slightly).

Please cc Manfred on mqueue things? He still watches ;)

> In particular, we used to test for overflow using:
> msgs * msgsize + msgs * sizeof(struct msg_msg *)
>
> That was never really correct because each message we allocate via
> load_msg() is actually a struct msg_msg followed by the data for
> the message (and if struct msg_msg + data exceeds PAGE_SIZE we end
> up allocating struct msg_msgseg structs too, but accounting for them
> would get really tedious, so let's ignore those...they're only a
> pointer in size anyway). This patch updates the calculation to be
> more accurate in regards to maximum possible memory consumption by the
> mqueue.
>
> ...
>
> --- a/ipc/mqueue.c
> +++ b/ipc/mqueue.c
>
> ...
>
> @@ -684,8 +686,11 @@ static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr)
> /* check for overflow */
> if (attr->mq_msgsize > ULONG_MAX/attr->mq_maxmsg)
> return 0;
> - if ((unsigned long)(attr->mq_maxmsg * (attr->mq_msgsize
> - + sizeof (struct msg_msg *))) <
> + mq_treesize = attr->mq_maxmsg * sizeof(struct msg_msg) +
> + min_t(unsigned int, attr->mq_maxmsg, MQ_PRIO_MAX) *
> + sizeof(struct posix_msg_tree_node);
> + if ((unsigned long)(attr->mq_maxmsg * attr->mq_msgsize +
> + mq_treesize) <
> (unsigned long)(attr->mq_maxmsg * attr->mq_msgsize))
> return 0;
> return 1;

That's a bit of a mouthful. Does this look OK?

--- a/ipc/mqueue.c~ipc-mqueue-correct-mq_attr_ok-test-fix
+++ a/ipc/mqueue.c
@@ -672,7 +672,8 @@ static void remove_notification(struct m
static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr)
{
int mq_treesize;
-
+ unsigned long total_size;
+
if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
return 0;
if (capable(CAP_SYS_RESOURCE)) {
@@ -690,9 +691,8 @@ static int mq_attr_ok(struct ipc_namespa
mq_treesize = attr->mq_maxmsg * sizeof(struct msg_msg) +
min_t(unsigned int, attr->mq_maxmsg, MQ_PRIO_MAX) *
sizeof(struct posix_msg_tree_node);
- if ((unsigned long)(attr->mq_maxmsg * attr->mq_msgsize +
- mq_treesize) <
- (unsigned long)(attr->mq_maxmsg * attr->mq_msgsize))
+ total_size = attr->mq_maxmsg * attr->mq_msgsize;
+ if (total_size + mq_treesize < total_size)
return 0;
return 1;
}
_

2012-05-01 19:38:09

by Doug Ledford

[permalink] [raw]
Subject: Re: [Patch 2/4] ipc/mqueue: correct mq_attr_ok test

On 05/01/2012 03:34 PM, Andrew Morton wrote:
> On Tue, 1 May 2012 13:50:53 -0400
> Doug Ledford <[email protected]> wrote:
>
>> While working on the other parts of the mqueue stuff, I noticed that
>> the calculation for overflow in mq_attr_ok didn't actually match
>> reality (this is especially true since my last patch which changed
>> how we account memory slightly).
>
> Please cc Manfred on mqueue things? He still watches ;)
>
>> In particular, we used to test for overflow using:
>> msgs * msgsize + msgs * sizeof(struct msg_msg *)
>>
>> That was never really correct because each message we allocate via
>> load_msg() is actually a struct msg_msg followed by the data for
>> the message (and if struct msg_msg + data exceeds PAGE_SIZE we end
>> up allocating struct msg_msgseg structs too, but accounting for them
>> would get really tedious, so let's ignore those...they're only a
>> pointer in size anyway). This patch updates the calculation to be
>> more accurate in regards to maximum possible memory consumption by the
>> mqueue.
>>
>> ...
>>
>> --- a/ipc/mqueue.c
>> +++ b/ipc/mqueue.c
>>
>> ...
>>
>> @@ -684,8 +686,11 @@ static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr)
>> /* check for overflow */
>> if (attr->mq_msgsize > ULONG_MAX/attr->mq_maxmsg)
>> return 0;
>> - if ((unsigned long)(attr->mq_maxmsg * (attr->mq_msgsize
>> - + sizeof (struct msg_msg *))) <
>> + mq_treesize = attr->mq_maxmsg * sizeof(struct msg_msg) +
>> + min_t(unsigned int, attr->mq_maxmsg, MQ_PRIO_MAX) *
>> + sizeof(struct posix_msg_tree_node);
>> + if ((unsigned long)(attr->mq_maxmsg * attr->mq_msgsize +
>> + mq_treesize) <
>> (unsigned long)(attr->mq_maxmsg * attr->mq_msgsize))
>> return 0;
>> return 1;
>
> That's a bit of a mouthful. Does this look OK?
>
> --- a/ipc/mqueue.c~ipc-mqueue-correct-mq_attr_ok-test-fix
> +++ a/ipc/mqueue.c
> @@ -672,7 +672,8 @@ static void remove_notification(struct m
> static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr)
> {
> int mq_treesize;
> -
> + unsigned long total_size;
> +
> if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
> return 0;
> if (capable(CAP_SYS_RESOURCE)) {
> @@ -690,9 +691,8 @@ static int mq_attr_ok(struct ipc_namespa
> mq_treesize = attr->mq_maxmsg * sizeof(struct msg_msg) +
> min_t(unsigned int, attr->mq_maxmsg, MQ_PRIO_MAX) *
> sizeof(struct posix_msg_tree_node);
> - if ((unsigned long)(attr->mq_maxmsg * attr->mq_msgsize +
> - mq_treesize) <
> - (unsigned long)(attr->mq_maxmsg * attr->mq_msgsize))
> + total_size = attr->mq_maxmsg * attr->mq_msgsize;
> + if (total_size + mq_treesize < total_size)
> return 0;
> return 1;
> }

Sure, looks fine to me and should preserve the wrap around test behavior.


--
Doug Ledford <[email protected]>
GPG KeyID: 0E572FDD
http://people.redhat.com/dledford



Attachments:
signature.asc (900.00 B)
OpenPGP digital signature

2012-05-01 19:53:48

by Andrew Morton

[permalink] [raw]
Subject: Re: [Patch 4/4] tools/selftests: add mq_perf_tests

On Tue, 1 May 2012 13:50:55 -0400
Doug Ledford <[email protected]> wrote:

> Add the mq_perf_tests tool I used when creating my mq performance patch.
> Also add a local .gitignore to keep the binaries from showing up in
> git status output.
>

hm, this code sends checkpatch berzerk. I do think that selftests code
should match regular kernel coding - after all, kernel developers are
the ones who will be reading and modifying the code.

> diff --git a/tools/testing/selftests/mqueue/mq_perf_tests.c b/tools/testing/selftests/mqueue/mq_perf_tests.c

hm, I didn't have <popt.h>. On RH that's the popt-devel RPM. On this
Ubuntu(ish) machine it's libpopt-dev.

On an x86_64 build I get these:

mq_open_tests.c: In function 'main':
mq_open_tests.c:295: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
mq_open_tests.c:296: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
mq_open_tests.c:311: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
mq_open_tests.c:312: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
gcc -O2 -lrt -lpthread -lpopt -o mq_perf_tests mq_perf_tests.c
mq_perf_tests.c: In function 'open_queue':
mq_perf_tests.c:299: warning: format '%d' expects type 'int', but argument 2 has type 'long int'
mq_perf_tests.c:300: warning: format '%d' expects type 'int', but argument 2 has type 'long int'
mq_perf_tests.c:301: warning: format '%d' expects type 'int', but argument 2 has type 'long int'
mq_perf_tests.c: In function 'perf_test_thread':
mq_perf_tests.c:441: warning: format '%d' expects type 'int', but argument 2 has type 'long int'
mq_perf_tests.c:456: warning: format '%d' expects type 'int', but argument 2 has type '__time_t'
mq_perf_tests.c:456: warning: format '%d' expects type 'int', but argument 3 has type 'long int'
mq_perf_tests.c:459: warning: format '%d' expects type 'int', but argument 2 has type 'long long unsigned int'
mq_perf_tests.c:461: warning: format '%d' expects type 'int', but argument 2 has type '__time_t'
mq_perf_tests.c:461: warning: format '%d' expects type 'int', but argument 3 has type 'long int'
mq_perf_tests.c:464: warning: format '%d' expects type 'int', but argument 2 has type 'long long unsigned int'
mq_perf_tests.c:468: warning: format not a string literal and no format arguments
mq_perf_tests.c:495: warning: format '%d' expects type 'int', but argument 2 has type '__time_t'
mq_perf_tests.c:495: warning: format '%d' expects type 'int', but argument 3 has type 'long int'
mq_perf_tests.c:498: warning: format '%d' expects type 'int', but argument 2 has type 'long long unsigned int'
mq_perf_tests.c:500: warning: format '%d' expects type 'int', but argument 2 has type '__time_t'
mq_perf_tests.c:500: warning: format '%d' expects type 'int', but argument 3 has type 'long int'
mq_perf_tests.c:503: warning: format '%d' expects type 'int', but argument 2 has type 'long long unsigned int'
mq_perf_tests.c: In function 'main':
mq_perf_tests.c:651: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
mq_perf_tests.c:652: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
mq_perf_tests.c:666: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
mq_perf_tests.c:668: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'

I assume part of it is this:

/usr/include/bits/resource.h:typedef __rlim64_t rlim_t;

But I didn't look into the others. I can do so?

2012-05-01 20:01:57

by KOSAKI Motohiro

[permalink] [raw]
Subject: Re: [Patch 3/4] ipc/mqueue: strengthen checks on mqueue creation

(5/1/12 1:50 PM), Doug Ledford wrote:
> We already check the mq attr struct if it's passed in, but now that the
> admin can set system wide defaults separate from maximums, it's actually
> possible to set the defaults to something that would overflow. So,
> if there is no attr struct passed in to the open call, check the default
> values.
>
> While we are at it, simplify mq_attr_ok() by making it return 0 or an
> error condition, so that way if we add more tests to it later, we have
> the option of what error should be returned instead of the calling
> location having to pick a possibly inaccurate error code.
>
> Signed-off-by: Doug Ledford<[email protected]>
> ---
> ipc/mqueue.c | 27 ++++++++++++++++++---------
> 1 files changed, 18 insertions(+), 9 deletions(-)
>
> diff --git a/ipc/mqueue.c b/ipc/mqueue.c
> index 4b2892e..6089f73 100644
> --- a/ipc/mqueue.c
> +++ b/ipc/mqueue.c
> @@ -673,27 +673,27 @@ static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr)
> int mq_treesize;
>
> if (attr->mq_maxmsg<= 0 || attr->mq_msgsize<= 0)
> - return 0;
> + return -EINVAL;
> if (capable(CAP_SYS_RESOURCE)) {
> if (attr->mq_maxmsg> HARD_MSGMAX ||
> attr->mq_msgsize> HARD_MSGSIZEMAX)
> - return 0;
> + return -EINVAL;
> } else {
> if (attr->mq_maxmsg> ipc_ns->mq_msg_max ||
> attr->mq_msgsize> ipc_ns->mq_msgsize_max)
> - return 0;
> + return -EINVAL;
> }
> /* check for overflow */
> if (attr->mq_msgsize> ULONG_MAX/attr->mq_maxmsg)
> - return 0;
> + return -ENOMEM;
> mq_treesize = attr->mq_maxmsg * sizeof(struct msg_msg) +
> min_t(unsigned int, attr->mq_maxmsg, MQ_PRIO_MAX) *
> sizeof(struct posix_msg_tree_node);
> if ((unsigned long)(attr->mq_maxmsg * attr->mq_msgsize +
> mq_treesize)<
> (unsigned long)(attr->mq_maxmsg * attr->mq_msgsize))
> - return 0;
> - return 1;
> + return -ENOMEM;
> + return 0;

But ENOMEM is more inaccurate. It almostly is used for kmalloc failure.

2012-05-01 20:11:40

by Doug Ledford

[permalink] [raw]
Subject: Re: [Patch 3/4] ipc/mqueue: strengthen checks on mqueue creation

On 05/01/2012 04:01 PM, KOSAKI Motohiro wrote:
> (5/1/12 1:50 PM), Doug Ledford wrote:
>> We already check the mq attr struct if it's passed in, but now that the
>> admin can set system wide defaults separate from maximums, it's actually
>> possible to set the defaults to something that would overflow. So,
>> if there is no attr struct passed in to the open call, check the default
>> values.
>>
>> While we are at it, simplify mq_attr_ok() by making it return 0 or an
>> error condition, so that way if we add more tests to it later, we have
>> the option of what error should be returned instead of the calling
>> location having to pick a possibly inaccurate error code.
>>
>> Signed-off-by: Doug Ledford<[email protected]>
>> ---
>> ipc/mqueue.c | 27 ++++++++++++++++++---------
>> 1 files changed, 18 insertions(+), 9 deletions(-)
>>
>> diff --git a/ipc/mqueue.c b/ipc/mqueue.c
>> index 4b2892e..6089f73 100644
>> --- a/ipc/mqueue.c
>> +++ b/ipc/mqueue.c
>> @@ -673,27 +673,27 @@ static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr)
>> int mq_treesize;
>>
>> if (attr->mq_maxmsg<= 0 || attr->mq_msgsize<= 0)
>> - return 0;
>> + return -EINVAL;
>> if (capable(CAP_SYS_RESOURCE)) {
>> if (attr->mq_maxmsg> HARD_MSGMAX ||
>> attr->mq_msgsize> HARD_MSGSIZEMAX)
>> - return 0;
>> + return -EINVAL;
>> } else {
>> if (attr->mq_maxmsg> ipc_ns->mq_msg_max ||
>> attr->mq_msgsize> ipc_ns->mq_msgsize_max)
>> - return 0;
>> + return -EINVAL;
>> }
>> /* check for overflow */
>> if (attr->mq_msgsize> ULONG_MAX/attr->mq_maxmsg)
>> - return 0;
>> + return -ENOMEM;
>> mq_treesize = attr->mq_maxmsg * sizeof(struct msg_msg) +
>> min_t(unsigned int, attr->mq_maxmsg, MQ_PRIO_MAX) *
>> sizeof(struct posix_msg_tree_node);
>> if ((unsigned long)(attr->mq_maxmsg * attr->mq_msgsize +
>> mq_treesize)<
>> (unsigned long)(attr->mq_maxmsg * attr->mq_msgsize))
>> - return 0;
>> - return 1;
>> + return -ENOMEM;
>> + return 0;
>
> But ENOMEM is more inaccurate. It almostly is used for kmalloc failure.

I chose ENOMEM for that particular error because above there we have
checked the passed in arguments to make sure that they don't violate our
allowances for max message or max message size. If we violate either of
those items, we return EINVAL. In this case, neither of the values is
invalid, it's just that together they make an overly large allocation.
I would see that as more helpful to a programmer than EINVAL when the
values are within the maximums allowed. At least with ENOMEM the
programmer knows they have to reduce their combined message size and
message count in order to get things working.


--
Doug Ledford <[email protected]>
GPG KeyID: 0E572FDD
http://people.redhat.com/dledford



Attachments:
signature.asc (900.00 B)
OpenPGP digital signature

2012-05-01 20:14:17

by Doug Ledford

[permalink] [raw]
Subject: Re: [Patch 4/4] tools/selftests: add mq_perf_tests

On 05/01/2012 03:53 PM, Andrew Morton wrote:
> On Tue, 1 May 2012 13:50:55 -0400
> Doug Ledford <[email protected]> wrote:
>
>> Add the mq_perf_tests tool I used when creating my mq performance patch.
>> Also add a local .gitignore to keep the binaries from showing up in
>> git status output.
>>
>
> hm, this code sends checkpatch berzerk. I do think that selftests code
> should match regular kernel coding - after all, kernel developers are
> the ones who will be reading and modifying the code.

Fair enough.

>> diff --git a/tools/testing/selftests/mqueue/mq_perf_tests.c b/tools/testing/selftests/mqueue/mq_perf_tests.c
>
> hm, I didn't have <popt.h>. On RH that's the popt-devel RPM. On this
> Ubuntu(ish) machine it's libpopt-dev.

Yeah, I didn't want to throw any sort of high level requires in there,
like rpm dependencies or apt dependencies. I could get the rpm stuff
for Fedora right, but I would have just screwed up the rest. So a user
just has to manually make sure they have the right libraries to run this.

> On an x86_64 build I get these:
>
> mq_open_tests.c: In function 'main':
> mq_open_tests.c:295: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
> mq_open_tests.c:296: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
> mq_open_tests.c:311: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
> mq_open_tests.c:312: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
> gcc -O2 -lrt -lpthread -lpopt -o mq_perf_tests mq_perf_tests.c
> mq_perf_tests.c: In function 'open_queue':
> mq_perf_tests.c:299: warning: format '%d' expects type 'int', but argument 2 has type 'long int'
> mq_perf_tests.c:300: warning: format '%d' expects type 'int', but argument 2 has type 'long int'
> mq_perf_tests.c:301: warning: format '%d' expects type 'int', but argument 2 has type 'long int'
> mq_perf_tests.c: In function 'perf_test_thread':
> mq_perf_tests.c:441: warning: format '%d' expects type 'int', but argument 2 has type 'long int'
> mq_perf_tests.c:456: warning: format '%d' expects type 'int', but argument 2 has type '__time_t'
> mq_perf_tests.c:456: warning: format '%d' expects type 'int', but argument 3 has type 'long int'
> mq_perf_tests.c:459: warning: format '%d' expects type 'int', but argument 2 has type 'long long unsigned int'
> mq_perf_tests.c:461: warning: format '%d' expects type 'int', but argument 2 has type '__time_t'
> mq_perf_tests.c:461: warning: format '%d' expects type 'int', but argument 3 has type 'long int'
> mq_perf_tests.c:464: warning: format '%d' expects type 'int', but argument 2 has type 'long long unsigned int'
> mq_perf_tests.c:468: warning: format not a string literal and no format arguments
> mq_perf_tests.c:495: warning: format '%d' expects type 'int', but argument 2 has type '__time_t'
> mq_perf_tests.c:495: warning: format '%d' expects type 'int', but argument 3 has type 'long int'
> mq_perf_tests.c:498: warning: format '%d' expects type 'int', but argument 2 has type 'long long unsigned int'
> mq_perf_tests.c:500: warning: format '%d' expects type 'int', but argument 2 has type '__time_t'
> mq_perf_tests.c:500: warning: format '%d' expects type 'int', but argument 3 has type 'long int'
> mq_perf_tests.c:503: warning: format '%d' expects type 'int', but argument 2 has type 'long long unsigned int'
> mq_perf_tests.c: In function 'main':
> mq_perf_tests.c:651: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
> mq_perf_tests.c:652: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
> mq_perf_tests.c:666: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
> mq_perf_tests.c:668: warning: format '%d' expects type 'int', but argument 2 has type 'rlim_t'
>
> I assume part of it is this:
>
> /usr/include/bits/resource.h:typedef __rlim64_t rlim_t;
>
> But I didn't look into the others. I can do so?

I don't get any of that sort of noise, it all compiles cleanly here.

--
Doug Ledford <[email protected]>
GPG KeyID: 0E572FDD
http://people.redhat.com/dledford



Attachments:
signature.asc (900.00 B)
OpenPGP digital signature

2012-05-01 20:19:15

by KOSAKI Motohiro

[permalink] [raw]
Subject: Re: [Patch 3/4] ipc/mqueue: strengthen checks on mqueue creation

>> But ENOMEM is more inaccurate. It almostly is used for kmalloc failure.
>
> I chose ENOMEM for that particular error because above there we have
> checked the passed in arguments to make sure that they don't violate our
> allowances for max message or max message size. ?If we violate either of
> those items, we return EINVAL. ?In this case, neither of the values is
> invalid, it's just that together they make an overly large allocation.
> I would see that as more helpful to a programmer than EINVAL when the
> values are within the maximums allowed. ?At least with ENOMEM the
> programmer knows they have to reduce their combined message size and
> message count in order to get things working.

Incorrect. When ENOMEM is returned, programmers can't know
which problem was happen 1) kernel has real memory starvation
or 2) queue limitation exceed was happen. The problem is, you
introduced new overloaded error code for avoiding overload error code.
It doesn't make sense. My question was, why can't you choose no
overload error code if you want accurate one?

2012-05-01 23:03:09

by Doug Ledford

[permalink] [raw]
Subject: Re: [Patch 3/4] ipc/mqueue: strengthen checks on mqueue creation

On 5/1/2012 4:18 PM, KOSAKI Motohiro wrote:
>>> But ENOMEM is more inaccurate. It almostly is used for kmalloc failure.
>>
>> I chose ENOMEM for that particular error because above there we have
>> checked the passed in arguments to make sure that they don't violate our
>> allowances for max message or max message size. If we violate either of
>> those items, we return EINVAL. In this case, neither of the values is
>> invalid, it's just that together they make an overly large allocation.
>> I would see that as more helpful to a programmer than EINVAL when the
>> values are within the maximums allowed. At least with ENOMEM the
>> programmer knows they have to reduce their combined message size and
>> message count in order to get things working.
>
> Incorrect. When ENOMEM is returned, programmers can't know
> which problem was happen 1) kernel has real memory starvation
> or 2) queue limitation exceed was happen. The problem is, you
> introduced new overloaded error code for avoiding overload error code.
> It doesn't make sense. My question was, why can't you choose no
> overload error code if you want accurate one?

OK, then would EOVERFLOW suit things better?

All this reminds me that when this is taken into Linus' kernel, we need
to coordinate a man page update for the mq subsystem.


--
Doug Ledford <[email protected]>
GPG KeyID: 0E572FDD
http://people.redhat.com/dledford

Infiniband specific RPMs available at
http://people.redhat.com/dledford/Infiniband


Attachments:
signature.asc (898.00 B)
OpenPGP digital signature

2012-05-01 23:04:57

by KOSAKI Motohiro

[permalink] [raw]
Subject: Re: [Patch 3/4] ipc/mqueue: strengthen checks on mqueue creation

(5/1/12 7:02 PM), Doug Ledford wrote:
> On 5/1/2012 4:18 PM, KOSAKI Motohiro wrote:
>>>> But ENOMEM is more inaccurate. It almostly is used for kmalloc failure.
>>>
>>> I chose ENOMEM for that particular error because above there we have
>>> checked the passed in arguments to make sure that they don't violate our
>>> allowances for max message or max message size. If we violate either of
>>> those items, we return EINVAL. In this case, neither of the values is
>>> invalid, it's just that together they make an overly large allocation.
>>> I would see that as more helpful to a programmer than EINVAL when the
>>> values are within the maximums allowed. At least with ENOMEM the
>>> programmer knows they have to reduce their combined message size and
>>> message count in order to get things working.
>>
>> Incorrect. When ENOMEM is returned, programmers can't know
>> which problem was happen 1) kernel has real memory starvation
>> or 2) queue limitation exceed was happen. The problem is, you
>> introduced new overloaded error code for avoiding overload error code.
>> It doesn't make sense. My question was, why can't you choose no
>> overload error code if you want accurate one?
>
> OK, then would EOVERFLOW suit things better?

I have no seen to any confusion source this. thank you.

then, ack all of this series.
Acked-by: KOSAKI Motohiro <[email protected]>


>
> All this reminds me that when this is taken into Linus' kernel, we need
> to coordinate a man page update for the mq subsystem.

2012-05-01 23:11:35

by Andrew Morton

[permalink] [raw]
Subject: Re: [Patch 3/4] ipc/mqueue: strengthen checks on mqueue creation

On Tue, 01 May 2012 19:04:53 -0400
KOSAKI Motohiro <[email protected]> wrote:

> > OK, then would EOVERFLOW suit things better?
>
> I have no seen to any confusion source this. thank you.

--- a/ipc/mqueue.c~ipc-mqueue-strengthen-checks-on-mqueue-creation-fix
+++ a/ipc/mqueue.c
@@ -687,13 +687,13 @@ static int mq_attr_ok(struct ipc_namespa
}
/* check for overflow */
if (attr->mq_msgsize > ULONG_MAX/attr->mq_maxmsg)
- return -ENOMEM;
+ return -EOVERFLOW;
mq_treesize = attr->mq_maxmsg * sizeof(struct msg_msg) +
min_t(unsigned int, attr->mq_maxmsg, MQ_PRIO_MAX) *
sizeof(struct posix_msg_tree_node);
total_size = attr->mq_maxmsg * attr->mq_msgsize;
if (total_size + mq_treesize < total_size)
- return -ENOMEM;
+ return -EOVERFLOW;
return 0;
}

_

2012-05-03 09:18:02

by Dan Carpenter

[permalink] [raw]
Subject: Re: [Patch 1/4] ipc/mqueue: improve performance of send/recv

On Tue, May 01, 2012 at 01:50:52PM -0400, Doug Ledford wrote:
> @@ -150,16 +241,25 @@ static struct inode *mqueue_get_inode(struct super_block *sb,
> info->attr.mq_maxmsg = attr->mq_maxmsg;
> info->attr.mq_msgsize = attr->mq_msgsize;
> }
> - mq_msg_tblsz = info->attr.mq_maxmsg * sizeof(struct msg_msg *);
> - if (mq_msg_tblsz > PAGE_SIZE)
> - info->messages = vmalloc(mq_msg_tblsz);
> - else
> - info->messages = kmalloc(mq_msg_tblsz, GFP_KERNEL);
> - if (!info->messages)
> - goto out_inode;
> + /*
> + * We used to allocate a static array of pointers and account
> + * the size of that array as well as one msg_msg struct per
> + * possible message into the queue size. That's no longer
> + * accurate as the queue is now an rbtree and will grow and
> + * shrink depending on usage patterns. We can, however, still
> + * account one msg_msg struct per message, but the nodes are
> + * allocated depending on priority usage, and most programs
> + * only use one, or a handful, of priorities. However, since
> + * this is pinned memory, we need to assume worst case, so
> + * that means the min(mq_maxmsg, max_priorities) * struct
> + * posix_msg_tree_node.
> + */
> + mq_treesize = info->attr.mq_maxmsg * sizeof(struct msg_msg) +
> + min_t(unsigned int, info->attr.mq_maxmsg, MQ_PRIO_MAX) *
> + sizeof(struct posix_msg_tree_node);

"info->attr.mq_maxmsg" is a long, but the min_t() truncates it to an
unsigned int. I'm not familiar with this code so I don't know if
that's a problem...

We do the same thing in mqueue_evict_inode() and mq_attr_ok().

regards,
dan carpenter

2012-05-03 10:05:18

by Nicholas Piggin

[permalink] [raw]
Subject: Re: [Patch 1/4] ipc/mqueue: improve performance of send/recv

On 2 May 2012 03:50, Doug Ledford <[email protected]> wrote:

> Avg time to send/recv (in nanoseconds per message)
>  when queue empty            305/288                    349/318
>  when queue full (65528 messages)
>    constant priority      526589/823                    362/314
>    increasing priority    403105/916                    495/445
>    decreasing priority     73420/594                    482/409
>    random priority        280147/920                    546/436
>
> Time to fill/drain queue (65528 messages, in seconds)
>  constant priority         17.37/.12                    .13/.12
>  increasing priority        4.14/.14                    .21/.18
>  decreasing priority       12.93/.13                    .21/.18
>  random priority            8.88/.16                    .22/.17
>
> So, I think the results speak for themselves.  It's possible this
> implementation could be improved by cacheing at least one priority
> level in the node tree (that would bring the queue empty performance
> more in line with the old implementation), but this works and is *so*
> much better than what we had, especially for the common case of a
> single priority in use, that further refinements can be in follow on
> patches.

Nice work! Yeah I think if you cache a last unused entry, that
should mostly solve the empty queue regression.

I would imagine most users won't have huge queues, so the
empty case should be important too.

2012-05-03 13:03:59

by Doug Ledford

[permalink] [raw]
Subject: Re: [Patch 1/4] ipc/mqueue: improve performance of send/recv

On 5/3/2012 5:21 AM, Dan Carpenter wrote:
> On Tue, May 01, 2012 at 01:50:52PM -0400, Doug Ledford wrote:
>> @@ -150,16 +241,25 @@ static struct inode *mqueue_get_inode(struct super_block *sb,
>> info->attr.mq_maxmsg = attr->mq_maxmsg;
>> info->attr.mq_msgsize = attr->mq_msgsize;
>> }
>> - mq_msg_tblsz = info->attr.mq_maxmsg * sizeof(struct msg_msg *);
>> - if (mq_msg_tblsz > PAGE_SIZE)
>> - info->messages = vmalloc(mq_msg_tblsz);
>> - else
>> - info->messages = kmalloc(mq_msg_tblsz, GFP_KERNEL);
>> - if (!info->messages)
>> - goto out_inode;
>> + /*
>> + * We used to allocate a static array of pointers and account
>> + * the size of that array as well as one msg_msg struct per
>> + * possible message into the queue size. That's no longer
>> + * accurate as the queue is now an rbtree and will grow and
>> + * shrink depending on usage patterns. We can, however, still
>> + * account one msg_msg struct per message, but the nodes are
>> + * allocated depending on priority usage, and most programs
>> + * only use one, or a handful, of priorities. However, since
>> + * this is pinned memory, we need to assume worst case, so
>> + * that means the min(mq_maxmsg, max_priorities) * struct
>> + * posix_msg_tree_node.
>> + */
>> + mq_treesize = info->attr.mq_maxmsg * sizeof(struct msg_msg) +
>> + min_t(unsigned int, info->attr.mq_maxmsg, MQ_PRIO_MAX) *
>> + sizeof(struct posix_msg_tree_node);
>
> "info->attr.mq_maxmsg" is a long, but the min_t() truncates it to an
> unsigned int. I'm not familiar with this code so I don't know if
> that's a problem...

It's fine. We currently cap mq_maxmsg at a hard limit of 65536, and
MQ_PRIO_MAX is 32768, so both well within the limits of truncating a
long to unsigned int. In order for this to ever be a problem, we would
first have to change the accounting of mq bytes in the user struct from
a 32bit type to a 64bit type. As long as it's still 32 bits, and as
long as mq_maxmsg * (sizeof(struct msg_msg) + mq_msgsize) must fit
within that 32bit struct, we will never have an mq_maxmsg large enough
to truncate in this situation.

> We do the same thing in mqueue_evict_inode() and mq_attr_ok().

All of the math in here would need an audit if we increased the maximum
mq bytes from 32bit to 64bit.


--
Doug Ledford <[email protected]>
GPG KeyID: 0E572FDD
http://people.redhat.com/dledford

Infiniband specific RPMs available at
http://people.redhat.com/dledford/Infiniband


Attachments:
signature.asc (898.00 B)
OpenPGP digital signature