2022-10-10 17:51:08

by Kuniyuki Iwashima

[permalink] [raw]
Subject: [PATCH v1 net 0/3] soreuseport: Fix issues related to the faster selection algorithm.

setsockopt(SO_INCOMING_CPU) for UDP/TCP is broken since 4.5/4.6 due to
these commits:

* e32ea7e74727 ("soreuseport: fast reuseport UDP socket selection")
* c125e80b8868 ("soreuseport: fast reuseport TCP socket selection")

These commits introduced the O(1) socket selection algorithm and removed
O(n) iteration over the list, but it ignores the score calculated by
compute_score(). As a result, it caused two misbehaviours:

* Unconnected sockets receive packets sent to connected sockets
* SO_INCOMING_CPU does not work

The former is fixed by commit acdcecc61285 ("udp: correct reuseport
selection with connected sockets"), but it introduced a rare race,
which the first patch fixes. The second patch fixes the latter, and
the third adds a test for SO_INCOMING_CPU.


Kuniyuki Iwashima (3):
udp: Update reuse->has_conns under reuseport_lock.
soreuseport: Fix socket selection for SO_INCOMING_CPU.
selftest: Add test for SO_INCOMING_CPU.

include/net/sock_reuseport.h | 25 ++-
net/core/sock.c | 5 +-
net/core/sock_reuseport.c | 88 ++++++++++-
net/ipv4/datagram.c | 2 +-
net/ipv4/udp.c | 2 +-
net/ipv6/datagram.c | 2 +-
net/ipv6/udp.c | 2 +-
tools/testing/selftests/net/.gitignore | 1 +
tools/testing/selftests/net/Makefile | 1 +
tools/testing/selftests/net/so_incoming_cpu.c | 148 ++++++++++++++++++
10 files changed, 260 insertions(+), 16 deletions(-)
create mode 100644 tools/testing/selftests/net/so_incoming_cpu.c

--
2.30.2


2022-10-10 18:02:10

by Kuniyuki Iwashima

[permalink] [raw]
Subject: [PATCH v1 net 3/3] selftest: Add test for SO_INCOMING_CPU.

Some highly optimised applications use SO_INCOMING_CPU to make them
efficient, but they didn't test if it's working correctly by getsockopt()
to avoid slowing down. As a result, no one noticed it had been broken
for years, so it's a good time to add a test to catch future regression.

The test does

1) Create $(nproc) TCP listeners associated with each CPU.

2) Create 32 child sockets for each listener by calling
sched_setaffinity() for each CPU.

3) Check if accept()ed sockets' sk_incoming_cpu matches
listener's one.

If we see -EAGAIN, SO_INCOMING_CPU is broken. However, we might not see
any error even if broken; the kernel could miraculously distribute all SYN
to correct listeners. Not to let that happen, we must increase the number
of clients and CPUs to some extent, so the test requires $(nproc) >= 2 and
creates 64 sockets at least.

Test:
$ nproc
96
$ ./so_incoming_cpu

Before the previous patch:

# Starting 1 tests from 2 test cases.
# RUN so_incoming_cpu.test1 ...
# so_incoming_cpu.c:129:test1:Expected cpu (82) == i (0)
# test1: Test terminated by assertion
# FAIL so_incoming_cpu.test1
not ok 1 so_incoming_cpu.test1
# FAILED: 0 / 1 tests passed.
# Totals: pass:0 fail:1 xfail:0 xpass:0 skip:0 error:0

After:

# Starting 1 tests from 2 test cases.
# RUN so_incoming_cpu.test1 ...
# so_incoming_cpu.c:137:test1:SO_INCOMING_CPU is very likely to be working correctly with 3072 sockets.
# OK so_incoming_cpu.test1
ok 1 so_incoming_cpu.test1
# PASSED: 1 / 1 tests passed.
# Totals: pass:1 fail:0 xfail:0 xpass:0 skip:0 error:0

Signed-off-by: Kuniyuki Iwashima <[email protected]>
---
tools/testing/selftests/net/.gitignore | 1 +
tools/testing/selftests/net/Makefile | 1 +
tools/testing/selftests/net/so_incoming_cpu.c | 148 ++++++++++++++++++
3 files changed, 150 insertions(+)
create mode 100644 tools/testing/selftests/net/so_incoming_cpu.c

diff --git a/tools/testing/selftests/net/.gitignore b/tools/testing/selftests/net/.gitignore
index 3d7adee7a3e6..ff8807cc9c2e 100644
--- a/tools/testing/selftests/net/.gitignore
+++ b/tools/testing/selftests/net/.gitignore
@@ -25,6 +25,7 @@ rxtimestamp
sk_bind_sendto_listen
sk_connect_zero_addr
socket
+so_incoming_cpu
so_netns_cookie
so_txtime
stress_reuseport_listen
diff --git a/tools/testing/selftests/net/Makefile b/tools/testing/selftests/net/Makefile
index 2a6b0bc648c4..ba57e7e7dc86 100644
--- a/tools/testing/selftests/net/Makefile
+++ b/tools/testing/selftests/net/Makefile
@@ -70,6 +70,7 @@ TEST_PROGS += io_uring_zerocopy_tx.sh
TEST_GEN_FILES += bind_bhash
TEST_GEN_PROGS += sk_bind_sendto_listen
TEST_GEN_PROGS += sk_connect_zero_addr
+TEST_GEN_PROGS += so_incoming_cpu

TEST_FILES := settings

diff --git a/tools/testing/selftests/net/so_incoming_cpu.c b/tools/testing/selftests/net/so_incoming_cpu.c
new file mode 100644
index 000000000000..0ee0f2e393eb
--- /dev/null
+++ b/tools/testing/selftests/net/so_incoming_cpu.c
@@ -0,0 +1,148 @@
+// SPDX-License-Identifier: GPL-2.0
+/* Copyright Amazon.com Inc. or its affiliates. */
+#define _GNU_SOURCE
+#include <sched.h>
+
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/sysinfo.h>
+
+#include "../kselftest_harness.h"
+
+#define CLIENT_PER_SERVER 32 /* More sockets, more reliable */
+#define NR_SERVER self->nproc
+#define NR_CLIENT (CLIENT_PER_SERVER * NR_SERVER)
+
+FIXTURE(so_incoming_cpu)
+{
+ int nproc;
+ int *servers;
+ union {
+ struct sockaddr addr;
+ struct sockaddr_in in_addr;
+ };
+ socklen_t addrlen;
+};
+
+FIXTURE_SETUP(so_incoming_cpu)
+{
+ self->nproc = get_nprocs();
+ ASSERT_LE(2, self->nproc);
+
+ self->servers = malloc(sizeof(int) * NR_SERVER);
+ ASSERT_NE(self->servers, NULL);
+
+ self->in_addr.sin_family = AF_INET;
+ self->in_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ self->in_addr.sin_port = htons(0);
+ self->addrlen = sizeof(struct sockaddr_in);
+}
+
+FIXTURE_TEARDOWN(so_incoming_cpu)
+{
+ int i;
+
+ for (i = 0; i < NR_SERVER; i++)
+ close(self->servers[i]);
+
+ free(self->servers);
+}
+
+void create_servers(struct __test_metadata *_metadata,
+ FIXTURE_DATA(so_incoming_cpu) *self)
+{
+ int i, fd, ret;
+
+ for (i = 0; i < NR_SERVER; i++) {
+ fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
+ ASSERT_NE(fd, -1);
+
+ ret = setsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &i, sizeof(int));
+ ASSERT_EQ(ret, 0);
+
+ ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &(int){1}, sizeof(int));
+ ASSERT_EQ(ret, 0);
+
+ ret = bind(fd, &self->addr, self->addrlen);
+ ASSERT_EQ(ret, 0);
+
+ if (i == 0) {
+ ret = getsockname(fd, &self->addr, &self->addrlen);
+ ASSERT_EQ(ret, 0);
+ }
+
+ /* We don't use CLIENT_PER_SERVER here not to block
+ * this test at connect() if SO_INCOMING_CPU is broken.
+ */
+ ret = listen(fd, NR_CLIENT);
+ ASSERT_EQ(ret, 0);
+
+ self->servers[i] = fd;
+ }
+}
+
+void create_clients(struct __test_metadata *_metadata,
+ FIXTURE_DATA(so_incoming_cpu) *self)
+{
+ cpu_set_t cpu_set;
+ int i, j, fd, ret;
+
+ for (i = 0; i < NR_SERVER; i++) {
+ CPU_ZERO(&cpu_set);
+
+ CPU_SET(i, &cpu_set);
+ ASSERT_EQ(CPU_COUNT(&cpu_set), 1);
+ ASSERT_NE(CPU_ISSET(i, &cpu_set), 0);
+
+ /* Make sure SYN will be processed on the i-th CPU
+ * and finally distributed to the i-th listener.
+ */
+ sched_setaffinity(0, sizeof(cpu_set), &cpu_set);
+ ASSERT_EQ(ret, 0);
+
+ for (j = 0; j < CLIENT_PER_SERVER; j++) {
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+ ASSERT_NE(fd, -1);
+
+ ret = connect(fd, &self->addr, self->addrlen);
+ ASSERT_EQ(ret, 0);
+
+ close(fd);
+ }
+ }
+}
+
+void verify_incoming_cpu(struct __test_metadata *_metadata,
+ FIXTURE_DATA(so_incoming_cpu) *self)
+{
+ int i, j, fd, cpu, ret, total = 0;
+ socklen_t len = sizeof(int);
+
+ for (i = 0; i < NR_SERVER; i++) {
+ for (j = 0; j < CLIENT_PER_SERVER; j++) {
+ /* If we see -EAGAIN here, SO_INCOMING_CPU is broken */
+ fd = accept(self->servers[i], &self->addr, &self->addrlen);
+ ASSERT_NE(fd, -1);
+
+ ret = getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &cpu, &len);
+ ASSERT_EQ(ret, 0);
+ ASSERT_EQ(cpu, i);
+
+ close(fd);
+ total++;
+ }
+ }
+
+ ASSERT_EQ(total, NR_CLIENT);
+ TH_LOG("SO_INCOMING_CPU is very likely to be "
+ "working correctly with %d sockets.", total);
+}
+
+TEST_F(so_incoming_cpu, test1)
+{
+ create_servers(_metadata, self);
+ create_clients(_metadata, self);
+ verify_incoming_cpu(_metadata, self);
+}
+
+TEST_HARNESS_MAIN
--
2.30.2

2022-10-10 18:23:35

by Kuniyuki Iwashima

[permalink] [raw]
Subject: [PATCH v1 net 2/3] soreuseport: Fix socket selection for SO_INCOMING_CPU.

Kazuho Oku reported that setsockopt(SO_INCOMING_CPU) does not work
with setsockopt(SO_REUSEPORT) for TCP since v4.6.

With the combination of SO_REUSEPORT and SO_INCOMING_CPU, we could
build a highly efficient server application.

setsockopt(SO_INCOMING_CPU) associates a CPU with a TCP listener
or UDP socket, and then incoming packets processed on the CPU will
likely be distributed to the socket. Technically, a socket could
even receive packets handled on another CPU if no sockets in the
reuseport group have the same CPU receiving the flow.

The logic exists in compute_score() so that a socket will get a higher
score if it has the same CPU with the flow. However, the score gets
ignored after the cited two commits, which introduced a faster socket
selection algorithm for SO_REUSEPORT.

This patch introduces a counter of sockets with SO_INCOMING_CPU in
a reuseport group to check if we should iterate all sockets to find
a proper one. We increment the counter when

* calling listen() if the socket has SO_INCOMING_CPU and SO_REUSEPORT

* enabling SO_INCOMING_CPU if the socket is in a reuseport group

Also, we decrement it when

* detaching a socket out of the group to apply SO_INCOMING_CPU to
migrated TCP requests

* disabling SO_INCOMING_CPU if the socket is in a reuseport group

When the counter reaches 0, we can get back to the O(1) selection
algorithm.

The overall changes are negligible for the non-SO_INCOMING_CPU case,
and the only notable thing is that we have to update sk_incomnig_cpu
under reuseport_lock. Otherwise, the race below traps us in the O(n)
algorithm even after disabling SO_INCOMING_CPU for all sockets in the
group.

cpu1 (setsockopt) cpu2 (listen)
+-----------------+ +-------------+

lock_sock(sk1) lock_sock(sk2)

reuseport_incoming_cpu_update(sk, val)
.
| - spin_lock_bh(&reuseport_lock)
|
| /* increment reuse->incoming_cpu, but
| * sk1->sk_incoming_cpu is still -1.
| */
| - __reuseport_incoming_cpu_inc(sk1, reuse)
|
| - spin_unlock_bh(&reuseport_lock)
|
| spin_lock_bh(&reuseport_lock)
| reuseport_grow(sk2, reuse)
| .
| | - more_socks_size = reuse->max_socks * 2U;
| | - if (more_socks_size > U16_MAX &&
| | reuse->num_closed_socks)
| | .
| | `- __reuseport_detach_closed_sock(sk1, reuse)
| | .
| | ` - reuseport_incoming_cpu_dec(sk1, reuse)
| .
| `- if (sk1->sk_incoming_cpu >= 0)
| /* read shutdown()ed sk1's sk_incoming_cpu
| * without lock_sock(), and ... do nothing!
`- WRITE_ONCE(sk1->incoming_cpu, 0) *
* leak 1 count of reuse->incoming_cpu.
*/

spin_unlock_bh(&reuseport_lock)

Fixes: e32ea7e74727 ("soreuseport: fast reuseport UDP socket selection")
Fixes: c125e80b8868 ("soreuseport: fast reuseport TCP socket selection")
Reported-by: Kazuho Oku <[email protected]>
Signed-off-by: Kuniyuki Iwashima <[email protected]>
---
include/net/sock_reuseport.h | 2 +
net/core/sock.c | 5 +-
net/core/sock_reuseport.c | 88 ++++++++++++++++++++++++++++++++++--
3 files changed, 89 insertions(+), 6 deletions(-)

diff --git a/include/net/sock_reuseport.h b/include/net/sock_reuseport.h
index fe9779e6d90f..d69fbea3d6cb 100644
--- a/include/net/sock_reuseport.h
+++ b/include/net/sock_reuseport.h
@@ -16,6 +16,7 @@ struct sock_reuseport {
u16 max_socks; /* length of socks */
u16 num_socks; /* elements in socks */
u16 num_closed_socks; /* closed elements in socks */
+ u16 incoming_cpu;
/* The last synq overflow event timestamp of this
* reuse->socks[] group.
*/
@@ -28,6 +29,7 @@ struct sock_reuseport {
struct sock *socks[]; /* array of sock pointers */
};

+void reuseport_incoming_cpu_update(struct sock *sk, int val);
extern int reuseport_alloc(struct sock *sk, bool bind_inany);
extern int reuseport_add_sock(struct sock *sk, struct sock *sk2,
bool bind_inany);
diff --git a/net/core/sock.c b/net/core/sock.c
index eeb6cbac6f49..ad67aba947e1 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -1436,7 +1436,10 @@ int sk_setsockopt(struct sock *sk, int level, int optname,
break;
}
case SO_INCOMING_CPU:
- WRITE_ONCE(sk->sk_incoming_cpu, val);
+ if (rcu_access_pointer(sk->sk_reuseport_cb))
+ reuseport_incoming_cpu_update(sk, val);
+ else
+ WRITE_ONCE(sk->sk_incoming_cpu, val);
break;

case SO_CNX_ADVICE:
diff --git a/net/core/sock_reuseport.c b/net/core/sock_reuseport.c
index 5daa1fa54249..6f5cda58b2d4 100644
--- a/net/core/sock_reuseport.c
+++ b/net/core/sock_reuseport.c
@@ -21,6 +21,64 @@ static DEFINE_IDA(reuseport_ida);
static int reuseport_resurrect(struct sock *sk, struct sock_reuseport *old_reuse,
struct sock_reuseport *reuse, bool bind_inany);

+static void __reuseport_incoming_cpu_inc(struct sock *sk, struct sock_reuseport *reuse)
+{
+ /* paired with READ_ONCE() in reuseport_select_sock_by_hash() */
+ WRITE_ONCE(reuse->incoming_cpu, reuse->incoming_cpu + 1);
+}
+
+static void __reuseport_incoming_cpu_dec(struct sock *sk, struct sock_reuseport *reuse)
+{
+ /* paired with READ_ONCE() in reuseport_select_sock_by_hash() */
+ WRITE_ONCE(reuse->incoming_cpu, reuse->incoming_cpu - 1);
+}
+
+static void reuseport_incoming_cpu_inc(struct sock *sk, struct sock_reuseport *reuse)
+{
+ if (sk->sk_incoming_cpu >= 0)
+ __reuseport_incoming_cpu_inc(sk, reuse);
+}
+
+static void reuseport_incoming_cpu_dec(struct sock *sk, struct sock_reuseport *reuse)
+{
+ if (sk->sk_incoming_cpu >= 0)
+ __reuseport_incoming_cpu_dec(sk, reuse);
+}
+
+void reuseport_incoming_cpu_update(struct sock *sk, int val)
+{
+ struct sock_reuseport *reuse;
+
+ spin_lock_bh(&reuseport_lock);
+ reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
+ lockdep_is_held(&reuseport_lock));
+
+ if (!reuse) {
+ /* reuseport_grow() has detached a shutdown()ed
+ * sk, and sk_state is TCP_CLOSE, so no one can
+ * read this sk_incoming_cpu concurrently.
+ */
+ sk->sk_incoming_cpu = val;
+ goto out;
+ }
+
+ /* This must be done under reuseport_lock to avoid a race with
+ * reuseport_grow(), which accesses sk->sk_incoming_cpu without
+ * lock_sock() when detaching a shutdown()ed sk.
+ *
+ * paired with READ_ONCE() in reuseport_select_sock_by_hash()
+ */
+ WRITE_ONCE(sk->sk_incoming_cpu, val);
+
+ if (sk->sk_incoming_cpu < 0 && val >= 0)
+ __reuseport_incoming_cpu_inc(sk, reuse);
+ else if (sk->sk_incoming_cpu >= 0 && val < 0)
+ __reuseport_incoming_cpu_dec(sk, reuse);
+
+out:
+ spin_unlock_bh(&reuseport_lock);
+}
+
static int reuseport_sock_index(struct sock *sk,
const struct sock_reuseport *reuse,
bool closed)
@@ -48,6 +106,7 @@ static void __reuseport_add_sock(struct sock *sk,
/* paired with smp_rmb() in reuseport_(select|migrate)_sock() */
smp_wmb();
reuse->num_socks++;
+ reuseport_incoming_cpu_inc(sk, reuse);
}

static bool __reuseport_detach_sock(struct sock *sk,
@@ -60,6 +119,7 @@ static bool __reuseport_detach_sock(struct sock *sk,

reuse->socks[i] = reuse->socks[reuse->num_socks - 1];
reuse->num_socks--;
+ reuseport_incoming_cpu_dec(sk, reuse);

return true;
}
@@ -70,6 +130,7 @@ static void __reuseport_add_closed_sock(struct sock *sk,
reuse->socks[reuse->max_socks - reuse->num_closed_socks - 1] = sk;
/* paired with READ_ONCE() in inet_csk_bind_conflict() */
WRITE_ONCE(reuse->num_closed_socks, reuse->num_closed_socks + 1);
+ reuseport_incoming_cpu_inc(sk, reuse);
}

static bool __reuseport_detach_closed_sock(struct sock *sk,
@@ -83,6 +144,7 @@ static bool __reuseport_detach_closed_sock(struct sock *sk,
reuse->socks[i] = reuse->socks[reuse->max_socks - reuse->num_closed_socks];
/* paired with READ_ONCE() in inet_csk_bind_conflict() */
WRITE_ONCE(reuse->num_closed_socks, reuse->num_closed_socks - 1);
+ reuseport_incoming_cpu_dec(sk, reuse);

return true;
}
@@ -150,6 +212,7 @@ int reuseport_alloc(struct sock *sk, bool bind_inany)
reuse->bind_inany = bind_inany;
reuse->socks[0] = sk;
reuse->num_socks = 1;
+ reuseport_incoming_cpu_inc(sk, reuse);
rcu_assign_pointer(sk->sk_reuseport_cb, reuse);

out:
@@ -193,6 +256,7 @@ static struct sock_reuseport *reuseport_grow(struct sock_reuseport *reuse)
more_reuse->reuseport_id = reuse->reuseport_id;
more_reuse->bind_inany = reuse->bind_inany;
more_reuse->has_conns = reuse->has_conns;
+ more_reuse->incoming_cpu = reuse->incoming_cpu;

memcpy(more_reuse->socks, reuse->socks,
reuse->num_socks * sizeof(struct sock *));
@@ -442,18 +506,32 @@ static struct sock *run_bpf_filter(struct sock_reuseport *reuse, u16 socks,
static struct sock *reuseport_select_sock_by_hash(struct sock_reuseport *reuse,
u32 hash, u16 num_socks)
{
+ struct sock *first_valid_sk = NULL;
int i, j;

i = j = reciprocal_scale(hash, num_socks);
- while (reuse->socks[i]->sk_state == TCP_ESTABLISHED) {
+ do {
+ struct sock *sk = reuse->socks[i];
+
+ if (sk->sk_state != TCP_ESTABLISHED) {
+ /* paired with WRITE_ONCE() in __reuseport_incoming_cpu_(inc|dec)() */
+ if (!READ_ONCE(reuse->incoming_cpu))
+ return sk;
+
+ /* paired with WRITE_ONCE() in reuseport_incoming_cpu_update() */
+ if (READ_ONCE(sk->sk_incoming_cpu) == raw_smp_processor_id())
+ return sk;
+
+ if (!first_valid_sk)
+ first_valid_sk = sk;
+ }
+
i++;
if (i >= num_socks)
i = 0;
- if (i == j)
- return NULL;
- }
+ } while (i != j);

- return reuse->socks[i];
+ return first_valid_sk;
}

/**
--
2.30.2

2022-10-11 11:38:57

by Paolo Abeni

[permalink] [raw]
Subject: Re: [PATCH v1 net 2/3] soreuseport: Fix socket selection for SO_INCOMING_CPU.

On Mon, 2022-10-10 at 10:43 -0700, Kuniyuki Iwashima wrote:
> Kazuho Oku reported that setsockopt(SO_INCOMING_CPU) does not work
> with setsockopt(SO_REUSEPORT) for TCP since v4.6.
>
> With the combination of SO_REUSEPORT and SO_INCOMING_CPU, we could
> build a highly efficient server application.
>
> setsockopt(SO_INCOMING_CPU) associates a CPU with a TCP listener
> or UDP socket, and then incoming packets processed on the CPU will
> likely be distributed to the socket. Technically, a socket could
> even receive packets handled on another CPU if no sockets in the
> reuseport group have the same CPU receiving the flow.
>
> The logic exists in compute_score() so that a socket will get a higher
> score if it has the same CPU with the flow. However, the score gets
> ignored after the cited two commits, which introduced a faster socket
> selection algorithm for SO_REUSEPORT.
>
> This patch introduces a counter of sockets with SO_INCOMING_CPU in
> a reuseport group to check if we should iterate all sockets to find
> a proper one. We increment the counter when
>
> * calling listen() if the socket has SO_INCOMING_CPU and SO_REUSEPORT
>
> * enabling SO_INCOMING_CPU if the socket is in a reuseport group
>
> Also, we decrement it when
>
> * detaching a socket out of the group to apply SO_INCOMING_CPU to
> migrated TCP requests
>
> * disabling SO_INCOMING_CPU if the socket is in a reuseport group
>
> When the counter reaches 0, we can get back to the O(1) selection
> algorithm.
>
> The overall changes are negligible for the non-SO_INCOMING_CPU case,
> and the only notable thing is that we have to update sk_incomnig_cpu
> under reuseport_lock. Otherwise, the race below traps us in the O(n)
> algorithm even after disabling SO_INCOMING_CPU for all sockets in the
> group.
>
> cpu1 (setsockopt) cpu2 (listen)
> +-----------------+ +-------------+
>
> lock_sock(sk1) lock_sock(sk2)
>
> reuseport_incoming_cpu_update(sk, val)
> .
> > - spin_lock_bh(&reuseport_lock)
> >
> > /* increment reuse->incoming_cpu, but
> > * sk1->sk_incoming_cpu is still -1.
> > */
> > - __reuseport_incoming_cpu_inc(sk1, reuse)
> >
> > - spin_unlock_bh(&reuseport_lock)
> >
> > spin_lock_bh(&reuseport_lock)
> > reuseport_grow(sk2, reuse)
> > .
> > | - more_socks_size = reuse->max_socks * 2U;
> > | - if (more_socks_size > U16_MAX &&
> > | reuse->num_closed_socks)
> > | .
> > | `- __reuseport_detach_closed_sock(sk1, reuse)
> > | .
> > | ` - reuseport_incoming_cpu_dec(sk1, reuse)
> > .
> > `- if (sk1->sk_incoming_cpu >= 0)
> > /* read shutdown()ed sk1's sk_incoming_cpu
> > * without lock_sock(), and ... do nothing!
> `- WRITE_ONCE(sk1->incoming_cpu, 0) *
> * leak 1 count of reuse->incoming_cpu.
> */
>
> spin_unlock_bh(&reuseport_lock)
>
> Fixes: e32ea7e74727 ("soreuseport: fast reuseport UDP socket selection")
> Fixes: c125e80b8868 ("soreuseport: fast reuseport TCP socket selection")
> Reported-by: Kazuho Oku <[email protected]>
> Signed-off-by: Kuniyuki Iwashima <[email protected]>
> ---
> include/net/sock_reuseport.h | 2 +
> net/core/sock.c | 5 +-
> net/core/sock_reuseport.c | 88 ++++++++++++++++++++++++++++++++++--
> 3 files changed, 89 insertions(+), 6 deletions(-)
>
> diff --git a/include/net/sock_reuseport.h b/include/net/sock_reuseport.h
> index fe9779e6d90f..d69fbea3d6cb 100644
> --- a/include/net/sock_reuseport.h
> +++ b/include/net/sock_reuseport.h
> @@ -16,6 +16,7 @@ struct sock_reuseport {
> u16 max_socks; /* length of socks */
> u16 num_socks; /* elements in socks */
> u16 num_closed_socks; /* closed elements in socks */
> + u16 incoming_cpu;
> /* The last synq overflow event timestamp of this
> * reuse->socks[] group.
> */
> @@ -28,6 +29,7 @@ struct sock_reuseport {
> struct sock *socks[]; /* array of sock pointers */
> };
>
> +void reuseport_incoming_cpu_update(struct sock *sk, int val);
> extern int reuseport_alloc(struct sock *sk, bool bind_inany);
> extern int reuseport_add_sock(struct sock *sk, struct sock *sk2,
> bool bind_inany);
> diff --git a/net/core/sock.c b/net/core/sock.c
> index eeb6cbac6f49..ad67aba947e1 100644
> --- a/net/core/sock.c
> +++ b/net/core/sock.c
> @@ -1436,7 +1436,10 @@ int sk_setsockopt(struct sock *sk, int level, int optname,
> break;
> }
> case SO_INCOMING_CPU:
> - WRITE_ONCE(sk->sk_incoming_cpu, val);
> + if (rcu_access_pointer(sk->sk_reuseport_cb))
> + reuseport_incoming_cpu_update(sk, val);
> + else
> + WRITE_ONCE(sk->sk_incoming_cpu, val);

I woould call the helper regardless of sk->sk_reuseport_cb and let it
do the correct thing, will make the code simpler and possibly safer.

> break;
>
> case SO_CNX_ADVICE:
> diff --git a/net/core/sock_reuseport.c b/net/core/sock_reuseport.c
> index 5daa1fa54249..6f5cda58b2d4 100644
> --- a/net/core/sock_reuseport.c
> +++ b/net/core/sock_reuseport.c
> @@ -21,6 +21,64 @@ static DEFINE_IDA(reuseport_ida);
> static int reuseport_resurrect(struct sock *sk, struct sock_reuseport *old_reuse,
> struct sock_reuseport *reuse, bool bind_inany);
>
> +static void __reuseport_incoming_cpu_inc(struct sock *sk, struct sock_reuseport *reuse)
> +{
> + /* paired with READ_ONCE() in reuseport_select_sock_by_hash() */
> + WRITE_ONCE(reuse->incoming_cpu, reuse->incoming_cpu + 1);
> +}

I find this helper name confusing (and I'm also horrible at picking
good names). Perhaps
__reuseport_use_cpu_inc()/__reuseport_use_cpu_dev() ?!?
> +
> +static void __reuseport_incoming_cpu_dec(struct sock *sk, struct sock_reuseport *reuse)
> +{
> + /* paired with READ_ONCE() in reuseport_select_sock_by_hash() */
> + WRITE_ONCE(reuse->incoming_cpu, reuse->incoming_cpu - 1);
> +}
> +
> +static void reuseport_incoming_cpu_inc(struct sock *sk, struct sock_reuseport *reuse)
> +{
> + if (sk->sk_incoming_cpu >= 0)
> + __reuseport_incoming_cpu_inc(sk, reuse);
> +}
> +
> +static void reuseport_incoming_cpu_dec(struct sock *sk, struct sock_reuseport *reuse)
> +{
> + if (sk->sk_incoming_cpu >= 0)
> + __reuseport_incoming_cpu_dec(sk, reuse);
> +}
> +
> +void reuseport_incoming_cpu_update(struct sock *sk, int val)
> +{
> + struct sock_reuseport *reuse;
> +
> + spin_lock_bh(&reuseport_lock);
> + reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
> + lockdep_is_held(&reuseport_lock));
> +
> + if (!reuse) {
> + /* reuseport_grow() has detached a shutdown()ed
> + * sk, and sk_state is TCP_CLOSE, so no one can
> + * read this sk_incoming_cpu concurrently.
> + */
> + sk->sk_incoming_cpu = val;
> + goto out;
> + }
> +
> + /* This must be done under reuseport_lock to avoid a race with
> + * reuseport_grow(), which accesses sk->sk_incoming_cpu without
> + * lock_sock() when detaching a shutdown()ed sk.
> + *
> + * paired with READ_ONCE() in reuseport_select_sock_by_hash()
> + */
> + WRITE_ONCE(sk->sk_incoming_cpu, val);
> +
> + if (sk->sk_incoming_cpu < 0 && val >= 0)

I don't see how the above condition can be true given the previous
statement ?!?

Possibly you can use something alike:

old_sk_incoming_cpu = sk->sk_incoming_cpu
WRITE_ONCE(sk->sk_incoming_cpu, val);
if (!reuse)
goto out;

if (old_sk_incoming_cpu < 0)
reuseport_incoming_cpu_inc()

So that:
- can additonal avoid the '__' helper variants
- a single write statement, no need to optimize out the WRITE_ONCE in
the !reuse corner case

> + __reuseport_incoming_cpu_inc(sk, reuse);
> + else if (sk->sk_incoming_cpu >= 0 && val < 0)
> + __reuseport_incoming_cpu_dec(sk, reuse);
> +
> +out:
> + spin_unlock_bh(&reuseport_lock);
> +}
> +
> static int reuseport_sock_index(struct sock *sk,
> const struct sock_reuseport *reuse,
> bool closed)
> @@ -48,6 +106,7 @@ static void __reuseport_add_sock(struct sock *sk,
> /* paired with smp_rmb() in reuseport_(select|migrate)_sock() */
> smp_wmb();
> reuse->num_socks++;
> + reuseport_incoming_cpu_inc(sk, reuse);
> }
>
> static bool __reuseport_detach_sock(struct sock *sk,
> @@ -60,6 +119,7 @@ static bool __reuseport_detach_sock(struct sock *sk,
>
> reuse->socks[i] = reuse->socks[reuse->num_socks - 1];
> reuse->num_socks--;
> + reuseport_incoming_cpu_dec(sk, reuse);
>
> return true;
> }
> @@ -70,6 +130,7 @@ static void __reuseport_add_closed_sock(struct sock *sk,
> reuse->socks[reuse->max_socks - reuse->num_closed_socks - 1] = sk;
> /* paired with READ_ONCE() in inet_csk_bind_conflict() */
> WRITE_ONCE(reuse->num_closed_socks, reuse->num_closed_socks + 1);
> + reuseport_incoming_cpu_inc(sk, reuse);
> }
>
> static bool __reuseport_detach_closed_sock(struct sock *sk,
> @@ -83,6 +144,7 @@ static bool __reuseport_detach_closed_sock(struct sock *sk,
> reuse->socks[i] = reuse->socks[reuse->max_socks - reuse->num_closed_socks];
> /* paired with READ_ONCE() in inet_csk_bind_conflict() */
> WRITE_ONCE(reuse->num_closed_socks, reuse->num_closed_socks - 1);
> + reuseport_incoming_cpu_dec(sk, reuse);
>
> return true;
> }
> @@ -150,6 +212,7 @@ int reuseport_alloc(struct sock *sk, bool bind_inany)
> reuse->bind_inany = bind_inany;
> reuse->socks[0] = sk;
> reuse->num_socks = 1;
> + reuseport_incoming_cpu_inc(sk, reuse);
> rcu_assign_pointer(sk->sk_reuseport_cb, reuse);
>
> out:
> @@ -193,6 +256,7 @@ static struct sock_reuseport *reuseport_grow(struct sock_reuseport *reuse)
> more_reuse->reuseport_id = reuse->reuseport_id;
> more_reuse->bind_inany = reuse->bind_inany;
> more_reuse->has_conns = reuse->has_conns;
> + more_reuse->incoming_cpu = reuse->incoming_cpu;
>
> memcpy(more_reuse->socks, reuse->socks,
> reuse->num_socks * sizeof(struct sock *));
> @@ -442,18 +506,32 @@ static struct sock *run_bpf_filter(struct sock_reuseport *reuse, u16 socks,
> static struct sock *reuseport_select_sock_by_hash(struct sock_reuseport *reuse,
> u32 hash, u16 num_socks)
> {
> + struct sock *first_valid_sk = NULL;
> int i, j;
>
> i = j = reciprocal_scale(hash, num_socks);
> - while (reuse->socks[i]->sk_state == TCP_ESTABLISHED) {
> + do {
> + struct sock *sk = reuse->socks[i];
> +
> + if (sk->sk_state != TCP_ESTABLISHED) {
> + /* paired with WRITE_ONCE() in __reuseport_incoming_cpu_(inc|dec)() */
> + if (!READ_ONCE(reuse->incoming_cpu))
> + return sk;
> +
> + /* paired with WRITE_ONCE() in reuseport_incoming_cpu_update() */
> + if (READ_ONCE(sk->sk_incoming_cpu) == raw_smp_processor_id())
> + return sk;
> +
> + if (!first_valid_sk)
> + first_valid_sk = sk;
> + }
> +
> i++;
> if (i >= num_socks)
> i = 0;
> - if (i == j)
> - return NULL;
> - }
> + } while (i != j);
>
> - return reuse->socks[i];
> + return first_valid_sk;
> }
>
IMHO this looks a bit too complex and possibly dangerous for -net. Have
you considered a net-next target?

Thanks,

Paolo

2022-10-11 11:45:12

by Paolo Abeni

[permalink] [raw]
Subject: Re: [PATCH v1 net 3/3] selftest: Add test for SO_INCOMING_CPU.

On Mon, 2022-10-10 at 10:43 -0700, Kuniyuki Iwashima wrote:
> Some highly optimised applications use SO_INCOMING_CPU to make them
> efficient, but they didn't test if it's working correctly by getsockopt()
> to avoid slowing down. As a result, no one noticed it had been broken
> for years, so it's a good time to add a test to catch future regression.
>
> The test does
>
> 1) Create $(nproc) TCP listeners associated with each CPU.
>
> 2) Create 32 child sockets for each listener by calling
> sched_setaffinity() for each CPU.
>
> 3) Check if accept()ed sockets' sk_incoming_cpu matches
> listener's one.
>
> If we see -EAGAIN, SO_INCOMING_CPU is broken. However, we might not see
> any error even if broken; the kernel could miraculously distribute all SYN
> to correct listeners. Not to let that happen, we must increase the number
> of clients and CPUs to some extent, so the test requires $(nproc) >= 2 and
> creates 64 sockets at least.
>
> Test:
> $ nproc
> 96
> $ ./so_incoming_cpu
>
> Before the previous patch:
>
> # Starting 1 tests from 2 test cases.
> # RUN so_incoming_cpu.test1 ...
> # so_incoming_cpu.c:129:test1:Expected cpu (82) == i (0)
> # test1: Test terminated by assertion
> # FAIL so_incoming_cpu.test1
> not ok 1 so_incoming_cpu.test1
> # FAILED: 0 / 1 tests passed.
> # Totals: pass:0 fail:1 xfail:0 xpass:0 skip:0 error:0
>
> After:
>
> # Starting 1 tests from 2 test cases.
> # RUN so_incoming_cpu.test1 ...
> # so_incoming_cpu.c:137:test1:SO_INCOMING_CPU is very likely to be working correctly with 3072 sockets.
> # OK so_incoming_cpu.test1
> ok 1 so_incoming_cpu.test1
> # PASSED: 1 / 1 tests passed.
> # Totals: pass:1 fail:0 xfail:0 xpass:0 skip:0 error:0
>
> Signed-off-by: Kuniyuki Iwashima <[email protected]>
> ---
> tools/testing/selftests/net/.gitignore | 1 +
> tools/testing/selftests/net/Makefile | 1 +
> tools/testing/selftests/net/so_incoming_cpu.c | 148 ++++++++++++++++++
> 3 files changed, 150 insertions(+)
> create mode 100644 tools/testing/selftests/net/so_incoming_cpu.c
>
> diff --git a/tools/testing/selftests/net/.gitignore b/tools/testing/selftests/net/.gitignore
> index 3d7adee7a3e6..ff8807cc9c2e 100644
> --- a/tools/testing/selftests/net/.gitignore
> +++ b/tools/testing/selftests/net/.gitignore
> @@ -25,6 +25,7 @@ rxtimestamp
> sk_bind_sendto_listen
> sk_connect_zero_addr
> socket
> +so_incoming_cpu
> so_netns_cookie
> so_txtime
> stress_reuseport_listen
> diff --git a/tools/testing/selftests/net/Makefile b/tools/testing/selftests/net/Makefile
> index 2a6b0bc648c4..ba57e7e7dc86 100644
> --- a/tools/testing/selftests/net/Makefile
> +++ b/tools/testing/selftests/net/Makefile
> @@ -70,6 +70,7 @@ TEST_PROGS += io_uring_zerocopy_tx.sh
> TEST_GEN_FILES += bind_bhash
> TEST_GEN_PROGS += sk_bind_sendto_listen
> TEST_GEN_PROGS += sk_connect_zero_addr
> +TEST_GEN_PROGS += so_incoming_cpu
>
> TEST_FILES := settings
>
> diff --git a/tools/testing/selftests/net/so_incoming_cpu.c b/tools/testing/selftests/net/so_incoming_cpu.c
> new file mode 100644
> index 000000000000..0ee0f2e393eb
> --- /dev/null
> +++ b/tools/testing/selftests/net/so_incoming_cpu.c
> @@ -0,0 +1,148 @@
> +// SPDX-License-Identifier: GPL-2.0
> +/* Copyright Amazon.com Inc. or its affiliates. */
> +#define _GNU_SOURCE
> +#include <sched.h>
> +
> +#include <netinet/in.h>
> +#include <sys/socket.h>
> +#include <sys/sysinfo.h>
> +
> +#include "../kselftest_harness.h"
> +
> +#define CLIENT_PER_SERVER 32 /* More sockets, more reliable */
> +#define NR_SERVER self->nproc
> +#define NR_CLIENT (CLIENT_PER_SERVER * NR_SERVER)
> +
> +FIXTURE(so_incoming_cpu)
> +{
> + int nproc;
> + int *servers;
> + union {
> + struct sockaddr addr;
> + struct sockaddr_in in_addr;
> + };
> + socklen_t addrlen;
> +};
> +
> +FIXTURE_SETUP(so_incoming_cpu)
> +{
> + self->nproc = get_nprocs();
> + ASSERT_LE(2, self->nproc);
> +
> + self->servers = malloc(sizeof(int) * NR_SERVER);
> + ASSERT_NE(self->servers, NULL);
> +
> + self->in_addr.sin_family = AF_INET;
> + self->in_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
> + self->in_addr.sin_port = htons(0);
> + self->addrlen = sizeof(struct sockaddr_in);
> +}
> +
> +FIXTURE_TEARDOWN(so_incoming_cpu)
> +{
> + int i;
> +
> + for (i = 0; i < NR_SERVER; i++)
> + close(self->servers[i]);
> +
> + free(self->servers);
> +}
> +
> +void create_servers(struct __test_metadata *_metadata,
> + FIXTURE_DATA(so_incoming_cpu) *self)
> +{
> + int i, fd, ret;
> +
> + for (i = 0; i < NR_SERVER; i++) {
> + fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
> + ASSERT_NE(fd, -1);
> +
> + ret = setsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &i, sizeof(int));
> + ASSERT_EQ(ret, 0);
> +
> + ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &(int){1}, sizeof(int));
> + ASSERT_EQ(ret, 0);
> +
> + ret = bind(fd, &self->addr, self->addrlen);
> + ASSERT_EQ(ret, 0);
> +
> + if (i == 0) {
> + ret = getsockname(fd, &self->addr, &self->addrlen);
> + ASSERT_EQ(ret, 0);
> + }
> +
> + /* We don't use CLIENT_PER_SERVER here not to block
> + * this test at connect() if SO_INCOMING_CPU is broken.
> + */
> + ret = listen(fd, NR_CLIENT);
> + ASSERT_EQ(ret, 0);
> +
> + self->servers[i] = fd;
> + }
> +}
> +
> +void create_clients(struct __test_metadata *_metadata,
> + FIXTURE_DATA(so_incoming_cpu) *self)
> +{
> + cpu_set_t cpu_set;
> + int i, j, fd, ret;
> +
> + for (i = 0; i < NR_SERVER; i++) {
> + CPU_ZERO(&cpu_set);
> +
> + CPU_SET(i, &cpu_set);
> + ASSERT_EQ(CPU_COUNT(&cpu_set), 1);
> + ASSERT_NE(CPU_ISSET(i, &cpu_set), 0);
> +
> + /* Make sure SYN will be processed on the i-th CPU
> + * and finally distributed to the i-th listener.
> + */
> + sched_setaffinity(0, sizeof(cpu_set), &cpu_set);
> + ASSERT_EQ(ret, 0);
> +
> + for (j = 0; j < CLIENT_PER_SERVER; j++) {
> + fd = socket(AF_INET, SOCK_STREAM, 0);
> + ASSERT_NE(fd, -1);
> +
> + ret = connect(fd, &self->addr, self->addrlen);
> + ASSERT_EQ(ret, 0);
> +
> + close(fd);
> + }
> + }
> +}
> +
> +void verify_incoming_cpu(struct __test_metadata *_metadata,
> + FIXTURE_DATA(so_incoming_cpu) *self)
> +{
> + int i, j, fd, cpu, ret, total = 0;
> + socklen_t len = sizeof(int);
> +
> + for (i = 0; i < NR_SERVER; i++) {
> + for (j = 0; j < CLIENT_PER_SERVER; j++) {
> + /* If we see -EAGAIN here, SO_INCOMING_CPU is broken */
> + fd = accept(self->servers[i], &self->addr, &self->addrlen);
> + ASSERT_NE(fd, -1);
> +
> + ret = getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &cpu, &len);
> + ASSERT_EQ(ret, 0);
> + ASSERT_EQ(cpu, i);
> +
> + close(fd);
> + total++;
> + }
> + }
> +
> + ASSERT_EQ(total, NR_CLIENT);
> + TH_LOG("SO_INCOMING_CPU is very likely to be "
> + "working correctly with %d sockets.", total);
> +}
> +
> +TEST_F(so_incoming_cpu, test1)
> +{
> + create_servers(_metadata, self);
> + create_clients(_metadata, self);
> + verify_incoming_cpu(_metadata, self);
> +}

I think it would be nicer if you could add more test-cases, covering
e.g.:
- set SO_INCOMING_CPU after SO_REUSE_PORT,
- initially including a socket without SO_INCOMING_CPU and the removing
it from the soreuseport set

Thanks,

Paolo

2022-10-11 15:45:39

by Kuniyuki Iwashima

[permalink] [raw]
Subject: Re: [PATCH v1 net 2/3] soreuseport: Fix socket selection for SO_INCOMING_CPU.

From: Paolo Abeni <[email protected]>
Date: Tue, 11 Oct 2022 13:28:11 +0200
> On Mon, 2022-10-10 at 10:43 -0700, Kuniyuki Iwashima wrote:
> > Kazuho Oku reported that setsockopt(SO_INCOMING_CPU) does not work
> > with setsockopt(SO_REUSEPORT) for TCP since v4.6.
> >
> > With the combination of SO_REUSEPORT and SO_INCOMING_CPU, we could
> > build a highly efficient server application.
> >
> > setsockopt(SO_INCOMING_CPU) associates a CPU with a TCP listener
> > or UDP socket, and then incoming packets processed on the CPU will
> > likely be distributed to the socket. Technically, a socket could
> > even receive packets handled on another CPU if no sockets in the
> > reuseport group have the same CPU receiving the flow.
> >
> > The logic exists in compute_score() so that a socket will get a higher
> > score if it has the same CPU with the flow. However, the score gets
> > ignored after the cited two commits, which introduced a faster socket
> > selection algorithm for SO_REUSEPORT.
> >
> > This patch introduces a counter of sockets with SO_INCOMING_CPU in
> > a reuseport group to check if we should iterate all sockets to find
> > a proper one. We increment the counter when
> >
> > * calling listen() if the socket has SO_INCOMING_CPU and SO_REUSEPORT
> >
> > * enabling SO_INCOMING_CPU if the socket is in a reuseport group
> >
> > Also, we decrement it when
> >
> > * detaching a socket out of the group to apply SO_INCOMING_CPU to
> > migrated TCP requests
> >
> > * disabling SO_INCOMING_CPU if the socket is in a reuseport group
> >
> > When the counter reaches 0, we can get back to the O(1) selection
> > algorithm.
> >
> > The overall changes are negligible for the non-SO_INCOMING_CPU case,
> > and the only notable thing is that we have to update sk_incomnig_cpu
> > under reuseport_lock. Otherwise, the race below traps us in the O(n)
> > algorithm even after disabling SO_INCOMING_CPU for all sockets in the
> > group.
> >
> > cpu1 (setsockopt) cpu2 (listen)
> > +-----------------+ +-------------+
> >
> > lock_sock(sk1) lock_sock(sk2)
> >
> > reuseport_incoming_cpu_update(sk, val)
> > .
> > > - spin_lock_bh(&reuseport_lock)
> > >
> > > /* increment reuse->incoming_cpu, but
> > > * sk1->sk_incoming_cpu is still -1.
> > > */
> > > - __reuseport_incoming_cpu_inc(sk1, reuse)
> > >
> > > - spin_unlock_bh(&reuseport_lock)
> > >
> > > spin_lock_bh(&reuseport_lock)
> > > reuseport_grow(sk2, reuse)
> > > .
> > > | - more_socks_size = reuse->max_socks * 2U;
> > > | - if (more_socks_size > U16_MAX &&
> > > | reuse->num_closed_socks)
> > > | .
> > > | `- __reuseport_detach_closed_sock(sk1, reuse)
> > > | .
> > > | ` - reuseport_incoming_cpu_dec(sk1, reuse)
> > > .
> > > `- if (sk1->sk_incoming_cpu >= 0)
> > > /* read shutdown()ed sk1's sk_incoming_cpu
> > > * without lock_sock(), and ... do nothing!
> > `- WRITE_ONCE(sk1->incoming_cpu, 0) *
> > * leak 1 count of reuse->incoming_cpu.
> > */
> >
> > spin_unlock_bh(&reuseport_lock)
> >
> > Fixes: e32ea7e74727 ("soreuseport: fast reuseport UDP socket selection")
> > Fixes: c125e80b8868 ("soreuseport: fast reuseport TCP socket selection")
> > Reported-by: Kazuho Oku <[email protected]>
> > Signed-off-by: Kuniyuki Iwashima <[email protected]>
> > ---
> > include/net/sock_reuseport.h | 2 +
> > net/core/sock.c | 5 +-
> > net/core/sock_reuseport.c | 88 ++++++++++++++++++++++++++++++++++--
> > 3 files changed, 89 insertions(+), 6 deletions(-)
> >
> > diff --git a/include/net/sock_reuseport.h b/include/net/sock_reuseport.h
> > index fe9779e6d90f..d69fbea3d6cb 100644
> > --- a/include/net/sock_reuseport.h
> > +++ b/include/net/sock_reuseport.h
> > @@ -16,6 +16,7 @@ struct sock_reuseport {
> > u16 max_socks; /* length of socks */
> > u16 num_socks; /* elements in socks */
> > u16 num_closed_socks; /* closed elements in socks */
> > + u16 incoming_cpu;
> > /* The last synq overflow event timestamp of this
> > * reuse->socks[] group.
> > */
> > @@ -28,6 +29,7 @@ struct sock_reuseport {
> > struct sock *socks[]; /* array of sock pointers */
> > };
> >
> > +void reuseport_incoming_cpu_update(struct sock *sk, int val);
> > extern int reuseport_alloc(struct sock *sk, bool bind_inany);
> > extern int reuseport_add_sock(struct sock *sk, struct sock *sk2,
> > bool bind_inany);
> > diff --git a/net/core/sock.c b/net/core/sock.c
> > index eeb6cbac6f49..ad67aba947e1 100644
> > --- a/net/core/sock.c
> > +++ b/net/core/sock.c
> > @@ -1436,7 +1436,10 @@ int sk_setsockopt(struct sock *sk, int level, int optname,
> > break;
> > }
> > case SO_INCOMING_CPU:
> > - WRITE_ONCE(sk->sk_incoming_cpu, val);
> > + if (rcu_access_pointer(sk->sk_reuseport_cb))
> > + reuseport_incoming_cpu_update(sk, val);
> > + else
> > + WRITE_ONCE(sk->sk_incoming_cpu, val);
>
> I woould call the helper regardless of sk->sk_reuseport_cb and let it
> do the correct thing, will make the code simpler and possibly safer.

I'll move the condition/WRITE_ONCE() into the helper.


> > break;
> >
> > case SO_CNX_ADVICE:
> > diff --git a/net/core/sock_reuseport.c b/net/core/sock_reuseport.c
> > index 5daa1fa54249..6f5cda58b2d4 100644
> > --- a/net/core/sock_reuseport.c
> > +++ b/net/core/sock_reuseport.c
> > @@ -21,6 +21,64 @@ static DEFINE_IDA(reuseport_ida);
> > static int reuseport_resurrect(struct sock *sk, struct sock_reuseport *old_reuse,
> > struct sock_reuseport *reuse, bool bind_inany);
> >
> > +static void __reuseport_incoming_cpu_inc(struct sock *sk, struct sock_reuseport *reuse)
> > +{
> > + /* paired with READ_ONCE() in reuseport_select_sock_by_hash() */
> > + WRITE_ONCE(reuse->incoming_cpu, reuse->incoming_cpu + 1);
> > +}
>
> I find this helper name confusing (and I'm also horrible at picking
> good names). Perhaps
> __reuseport_use_cpu_inc()/__reuseport_use_cpu_dev() ?!?

Yes, I'm bad at naming :)

Hmm... "use_cpu" sounds always true like "a socket uses a cpu", it would
be good if we can represent "we have a socket with a cpu specified", so
__reuseport_(get|put)_cpu_specified ...?

But we usually use get/put for refcounting, do you think it's a good
fit or confusing?


> > +
> > +static void __reuseport_incoming_cpu_dec(struct sock *sk, struct sock_reuseport *reuse)
> > +{
> > + /* paired with READ_ONCE() in reuseport_select_sock_by_hash() */
> > + WRITE_ONCE(reuse->incoming_cpu, reuse->incoming_cpu - 1);
> > +}
> > +
> > +static void reuseport_incoming_cpu_inc(struct sock *sk, struct sock_reuseport *reuse)
> > +{
> > + if (sk->sk_incoming_cpu >= 0)
> > + __reuseport_incoming_cpu_inc(sk, reuse);
> > +}
> > +
> > +static void reuseport_incoming_cpu_dec(struct sock *sk, struct sock_reuseport *reuse)
> > +{
> > + if (sk->sk_incoming_cpu >= 0)
> > + __reuseport_incoming_cpu_dec(sk, reuse);
> > +}
> > +
> > +void reuseport_incoming_cpu_update(struct sock *sk, int val)
> > +{
> > + struct sock_reuseport *reuse;
> > +
> > + spin_lock_bh(&reuseport_lock);
> > + reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
> > + lockdep_is_held(&reuseport_lock));
> > +
> > + if (!reuse) {
> > + /* reuseport_grow() has detached a shutdown()ed
> > + * sk, and sk_state is TCP_CLOSE, so no one can
> > + * read this sk_incoming_cpu concurrently.
> > + */
> > + sk->sk_incoming_cpu = val;
> > + goto out;
> > + }
> > +
> > + /* This must be done under reuseport_lock to avoid a race with
> > + * reuseport_grow(), which accesses sk->sk_incoming_cpu without
> > + * lock_sock() when detaching a shutdown()ed sk.
> > + *
> > + * paired with READ_ONCE() in reuseport_select_sock_by_hash()
> > + */
> > + WRITE_ONCE(sk->sk_incoming_cpu, val);
> > +
> > + if (sk->sk_incoming_cpu < 0 && val >= 0)
>
> I don't see how the above condition can be true given the previous
> statement ?!?

Ah... sorry, at first the WRITE_ONCE() above was put just before the
"out:" label below, but I moved it while writing the changelog so that
we won't publish the invalid state for the fast path:

1. slow path set reuse->incoming_cpu before setting sk->sk_incoming_cpu

2. fast path saw reuse->incoming_cpu >= 1, started iteration, but
found no socket with sk->sk_incoming_cpu

3. slow path do WRITE_ONCE(sk->sk_incoming_cpu, val)


> Possibly you can use something alike:
>
> old_sk_incoming_cpu = sk->sk_incoming_cpu
> WRITE_ONCE(sk->sk_incoming_cpu, val);
> if (!reuse)
> goto out;
>
> if (old_sk_incoming_cpu < 0)

Yes, we have to use the old value.

> reuseport_incoming_cpu_inc()
>
> So that:
> - can additonal avoid the '__' helper variants

But, we still need '__' helper to decrement the count if the change
is 1 -> -1.


> - a single write statement, no need to optimize out the WRITE_ONCE in
> the !reuse corner case
>
> > + __reuseport_incoming_cpu_inc(sk, reuse);
> > + else if (sk->sk_incoming_cpu >= 0 && val < 0)
> > + __reuseport_incoming_cpu_dec(sk, reuse);
> > +
> > +out:
> > + spin_unlock_bh(&reuseport_lock);
> > +}
> > +
> > static int reuseport_sock_index(struct sock *sk,
> > const struct sock_reuseport *reuse,
> > bool closed)
> > @@ -48,6 +106,7 @@ static void __reuseport_add_sock(struct sock *sk,
> > /* paired with smp_rmb() in reuseport_(select|migrate)_sock() */
> > smp_wmb();
> > reuse->num_socks++;
> > + reuseport_incoming_cpu_inc(sk, reuse);
> > }
> >
> > static bool __reuseport_detach_sock(struct sock *sk,
> > @@ -60,6 +119,7 @@ static bool __reuseport_detach_sock(struct sock *sk,
> >
> > reuse->socks[i] = reuse->socks[reuse->num_socks - 1];
> > reuse->num_socks--;
> > + reuseport_incoming_cpu_dec(sk, reuse);
> >
> > return true;
> > }
> > @@ -70,6 +130,7 @@ static void __reuseport_add_closed_sock(struct sock *sk,
> > reuse->socks[reuse->max_socks - reuse->num_closed_socks - 1] = sk;
> > /* paired with READ_ONCE() in inet_csk_bind_conflict() */
> > WRITE_ONCE(reuse->num_closed_socks, reuse->num_closed_socks + 1);
> > + reuseport_incoming_cpu_inc(sk, reuse);
> > }
> >
> > static bool __reuseport_detach_closed_sock(struct sock *sk,
> > @@ -83,6 +144,7 @@ static bool __reuseport_detach_closed_sock(struct sock *sk,
> > reuse->socks[i] = reuse->socks[reuse->max_socks - reuse->num_closed_socks];
> > /* paired with READ_ONCE() in inet_csk_bind_conflict() */
> > WRITE_ONCE(reuse->num_closed_socks, reuse->num_closed_socks - 1);
> > + reuseport_incoming_cpu_dec(sk, reuse);
> >
> > return true;
> > }
> > @@ -150,6 +212,7 @@ int reuseport_alloc(struct sock *sk, bool bind_inany)
> > reuse->bind_inany = bind_inany;
> > reuse->socks[0] = sk;
> > reuse->num_socks = 1;
> > + reuseport_incoming_cpu_inc(sk, reuse);
> > rcu_assign_pointer(sk->sk_reuseport_cb, reuse);
> >
> > out:
> > @@ -193,6 +256,7 @@ static struct sock_reuseport *reuseport_grow(struct sock_reuseport *reuse)
> > more_reuse->reuseport_id = reuse->reuseport_id;
> > more_reuse->bind_inany = reuse->bind_inany;
> > more_reuse->has_conns = reuse->has_conns;
> > + more_reuse->incoming_cpu = reuse->incoming_cpu;
> >
> > memcpy(more_reuse->socks, reuse->socks,
> > reuse->num_socks * sizeof(struct sock *));
> > @@ -442,18 +506,32 @@ static struct sock *run_bpf_filter(struct sock_reuseport *reuse, u16 socks,
> > static struct sock *reuseport_select_sock_by_hash(struct sock_reuseport *reuse,
> > u32 hash, u16 num_socks)
> > {
> > + struct sock *first_valid_sk = NULL;
> > int i, j;
> >
> > i = j = reciprocal_scale(hash, num_socks);
> > - while (reuse->socks[i]->sk_state == TCP_ESTABLISHED) {
> > + do {
> > + struct sock *sk = reuse->socks[i];
> > +
> > + if (sk->sk_state != TCP_ESTABLISHED) {
> > + /* paired with WRITE_ONCE() in __reuseport_incoming_cpu_(inc|dec)() */
> > + if (!READ_ONCE(reuse->incoming_cpu))
> > + return sk;
> > +
> > + /* paired with WRITE_ONCE() in reuseport_incoming_cpu_update() */
> > + if (READ_ONCE(sk->sk_incoming_cpu) == raw_smp_processor_id())
> > + return sk;
> > +
> > + if (!first_valid_sk)
> > + first_valid_sk = sk;
> > + }
> > +
> > i++;
> > if (i >= num_socks)
> > i = 0;
> > - if (i == j)
> > - return NULL;
> > - }
> > + } while (i != j);
> >
> > - return reuse->socks[i];
> > + return first_valid_sk;
> > }
> >
> IMHO this looks a bit too complex and possibly dangerous for -net. Have
> you considered a net-next target?

I thought this was regression and targeted -net, but considering no one
noticed it so long, I'm ok with net-next.

Thank you!

2022-10-11 16:20:52

by Kuniyuki Iwashima

[permalink] [raw]
Subject: Re: [PATCH v1 net 3/3] selftest: Add test for SO_INCOMING_CPU.

From: Paolo Abeni <[email protected]>
Date: Tue, 11 Oct 2022 13:34:58 +0200
> On Mon, 2022-10-10 at 10:43 -0700, Kuniyuki Iwashima wrote:
> > Some highly optimised applications use SO_INCOMING_CPU to make them
> > efficient, but they didn't test if it's working correctly by getsockopt()
> > to avoid slowing down. As a result, no one noticed it had been broken
> > for years, so it's a good time to add a test to catch future regression.
> >
> > The test does
> >
> > 1) Create $(nproc) TCP listeners associated with each CPU.
> >
> > 2) Create 32 child sockets for each listener by calling
> > sched_setaffinity() for each CPU.
> >
> > 3) Check if accept()ed sockets' sk_incoming_cpu matches
> > listener's one.
> >
> > If we see -EAGAIN, SO_INCOMING_CPU is broken. However, we might not see
> > any error even if broken; the kernel could miraculously distribute all SYN
> > to correct listeners. Not to let that happen, we must increase the number
> > of clients and CPUs to some extent, so the test requires $(nproc) >= 2 and
> > creates 64 sockets at least.
> >
> > Test:
> > $ nproc
> > 96
> > $ ./so_incoming_cpu
> >
> > Before the previous patch:
> >
> > # Starting 1 tests from 2 test cases.
> > # RUN so_incoming_cpu.test1 ...
> > # so_incoming_cpu.c:129:test1:Expected cpu (82) == i (0)
> > # test1: Test terminated by assertion
> > # FAIL so_incoming_cpu.test1
> > not ok 1 so_incoming_cpu.test1
> > # FAILED: 0 / 1 tests passed.
> > # Totals: pass:0 fail:1 xfail:0 xpass:0 skip:0 error:0
> >
> > After:
> >
> > # Starting 1 tests from 2 test cases.
> > # RUN so_incoming_cpu.test1 ...
> > # so_incoming_cpu.c:137:test1:SO_INCOMING_CPU is very likely to be working correctly with 3072 sockets.
> > # OK so_incoming_cpu.test1
> > ok 1 so_incoming_cpu.test1
> > # PASSED: 1 / 1 tests passed.
> > # Totals: pass:1 fail:0 xfail:0 xpass:0 skip:0 error:0
> >
> > Signed-off-by: Kuniyuki Iwashima <[email protected]>
> > ---
> > tools/testing/selftests/net/.gitignore | 1 +
> > tools/testing/selftests/net/Makefile | 1 +
> > tools/testing/selftests/net/so_incoming_cpu.c | 148 ++++++++++++++++++
> > 3 files changed, 150 insertions(+)
> > create mode 100644 tools/testing/selftests/net/so_incoming_cpu.c
> >
> > diff --git a/tools/testing/selftests/net/.gitignore b/tools/testing/selftests/net/.gitignore
> > index 3d7adee7a3e6..ff8807cc9c2e 100644
> > --- a/tools/testing/selftests/net/.gitignore
> > +++ b/tools/testing/selftests/net/.gitignore
> > @@ -25,6 +25,7 @@ rxtimestamp
> > sk_bind_sendto_listen
> > sk_connect_zero_addr
> > socket
> > +so_incoming_cpu
> > so_netns_cookie
> > so_txtime
> > stress_reuseport_listen
> > diff --git a/tools/testing/selftests/net/Makefile b/tools/testing/selftests/net/Makefile
> > index 2a6b0bc648c4..ba57e7e7dc86 100644
> > --- a/tools/testing/selftests/net/Makefile
> > +++ b/tools/testing/selftests/net/Makefile
> > @@ -70,6 +70,7 @@ TEST_PROGS += io_uring_zerocopy_tx.sh
> > TEST_GEN_FILES += bind_bhash
> > TEST_GEN_PROGS += sk_bind_sendto_listen
> > TEST_GEN_PROGS += sk_connect_zero_addr
> > +TEST_GEN_PROGS += so_incoming_cpu
> >
> > TEST_FILES := settings
> >
> > diff --git a/tools/testing/selftests/net/so_incoming_cpu.c b/tools/testing/selftests/net/so_incoming_cpu.c
> > new file mode 100644
> > index 000000000000..0ee0f2e393eb
> > --- /dev/null
> > +++ b/tools/testing/selftests/net/so_incoming_cpu.c
> > @@ -0,0 +1,148 @@
> > +// SPDX-License-Identifier: GPL-2.0
> > +/* Copyright Amazon.com Inc. or its affiliates. */
> > +#define _GNU_SOURCE
> > +#include <sched.h>
> > +
> > +#include <netinet/in.h>
> > +#include <sys/socket.h>
> > +#include <sys/sysinfo.h>
> > +
> > +#include "../kselftest_harness.h"
> > +
> > +#define CLIENT_PER_SERVER 32 /* More sockets, more reliable */
> > +#define NR_SERVER self->nproc
> > +#define NR_CLIENT (CLIENT_PER_SERVER * NR_SERVER)
> > +
> > +FIXTURE(so_incoming_cpu)
> > +{
> > + int nproc;
> > + int *servers;
> > + union {
> > + struct sockaddr addr;
> > + struct sockaddr_in in_addr;
> > + };
> > + socklen_t addrlen;
> > +};
> > +
> > +FIXTURE_SETUP(so_incoming_cpu)
> > +{
> > + self->nproc = get_nprocs();
> > + ASSERT_LE(2, self->nproc);
> > +
> > + self->servers = malloc(sizeof(int) * NR_SERVER);
> > + ASSERT_NE(self->servers, NULL);
> > +
> > + self->in_addr.sin_family = AF_INET;
> > + self->in_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
> > + self->in_addr.sin_port = htons(0);
> > + self->addrlen = sizeof(struct sockaddr_in);
> > +}
> > +
> > +FIXTURE_TEARDOWN(so_incoming_cpu)
> > +{
> > + int i;
> > +
> > + for (i = 0; i < NR_SERVER; i++)
> > + close(self->servers[i]);
> > +
> > + free(self->servers);
> > +}
> > +
> > +void create_servers(struct __test_metadata *_metadata,
> > + FIXTURE_DATA(so_incoming_cpu) *self)
> > +{
> > + int i, fd, ret;
> > +
> > + for (i = 0; i < NR_SERVER; i++) {
> > + fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
> > + ASSERT_NE(fd, -1);
> > +
> > + ret = setsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &i, sizeof(int));
> > + ASSERT_EQ(ret, 0);
> > +
> > + ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &(int){1}, sizeof(int));
> > + ASSERT_EQ(ret, 0);
> > +
> > + ret = bind(fd, &self->addr, self->addrlen);
> > + ASSERT_EQ(ret, 0);
> > +
> > + if (i == 0) {
> > + ret = getsockname(fd, &self->addr, &self->addrlen);
> > + ASSERT_EQ(ret, 0);
> > + }
> > +
> > + /* We don't use CLIENT_PER_SERVER here not to block
> > + * this test at connect() if SO_INCOMING_CPU is broken.
> > + */
> > + ret = listen(fd, NR_CLIENT);
> > + ASSERT_EQ(ret, 0);
> > +
> > + self->servers[i] = fd;
> > + }
> > +}
> > +
> > +void create_clients(struct __test_metadata *_metadata,
> > + FIXTURE_DATA(so_incoming_cpu) *self)
> > +{
> > + cpu_set_t cpu_set;
> > + int i, j, fd, ret;
> > +
> > + for (i = 0; i < NR_SERVER; i++) {
> > + CPU_ZERO(&cpu_set);
> > +
> > + CPU_SET(i, &cpu_set);
> > + ASSERT_EQ(CPU_COUNT(&cpu_set), 1);
> > + ASSERT_NE(CPU_ISSET(i, &cpu_set), 0);
> > +
> > + /* Make sure SYN will be processed on the i-th CPU
> > + * and finally distributed to the i-th listener.
> > + */
> > + sched_setaffinity(0, sizeof(cpu_set), &cpu_set);
> > + ASSERT_EQ(ret, 0);
> > +
> > + for (j = 0; j < CLIENT_PER_SERVER; j++) {
> > + fd = socket(AF_INET, SOCK_STREAM, 0);
> > + ASSERT_NE(fd, -1);
> > +
> > + ret = connect(fd, &self->addr, self->addrlen);
> > + ASSERT_EQ(ret, 0);
> > +
> > + close(fd);
> > + }
> > + }
> > +}
> > +
> > +void verify_incoming_cpu(struct __test_metadata *_metadata,
> > + FIXTURE_DATA(so_incoming_cpu) *self)
> > +{
> > + int i, j, fd, cpu, ret, total = 0;
> > + socklen_t len = sizeof(int);
> > +
> > + for (i = 0; i < NR_SERVER; i++) {
> > + for (j = 0; j < CLIENT_PER_SERVER; j++) {
> > + /* If we see -EAGAIN here, SO_INCOMING_CPU is broken */
> > + fd = accept(self->servers[i], &self->addr, &self->addrlen);
> > + ASSERT_NE(fd, -1);
> > +
> > + ret = getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &cpu, &len);
> > + ASSERT_EQ(ret, 0);
> > + ASSERT_EQ(cpu, i);
> > +
> > + close(fd);
> > + total++;
> > + }
> > + }
> > +
> > + ASSERT_EQ(total, NR_CLIENT);
> > + TH_LOG("SO_INCOMING_CPU is very likely to be "
> > + "working correctly with %d sockets.", total);
> > +}
> > +
> > +TEST_F(so_incoming_cpu, test1)
> > +{
> > + create_servers(_metadata, self);
> > + create_clients(_metadata, self);
> > + verify_incoming_cpu(_metadata, self);
> > +}
>
> I think it would be nicer if you could add more test-cases, covering
> e.g.:
> - set SO_INCOMING_CPU after SO_REUSE_PORT,
> - initially including a socket without SO_INCOMING_CPU and the removing
> it from the soreuseport set

I agree that, actually I did the tests with python :)
I'll add the cases in the next spin.

Thank you.