Return-Path: Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1754810AbcL3XCv (ORCPT ); Fri, 30 Dec 2016 18:02:51 -0500 Received: from mo4-p00-ob.smtp.rzone.de ([81.169.146.162]:17094 "EHLO mo4-p00-ob.smtp.rzone.de" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1754584AbcL3XBN (ORCPT ); Fri, 30 Dec 2016 18:01:13 -0500 X-RZG-AUTH: :OH8QVVOrc/CP6za/qRmbF3BWedPGA1vjs2ejZCzW8NRdwTYefHi0L5RzHLEjAZn5asq7vKs= X-RZG-CLASS-ID: mo00 From: Thomas Schoebel-Theuer To: linux-kernel@vger.kernel.org, tst@schoebel-theuer.de Subject: [RFC 14/32] mars: add new module xio_net Date: Fri, 30 Dec 2016 23:57:40 +0100 Message-Id: <18b159b3c5443c41434668e4c700ddee4440bb41.1483138400.git.tst@schoebel-theuer.de> X-Mailer: git-send-email 2.11.0 In-Reply-To: References: In-Reply-To: References: Sender: linux-kernel-owner@vger.kernel.org List-ID: X-Mailing-List: linux-kernel@vger.kernel.org Content-Length: 54106 Lines: 2048 Signed-off-by: Thomas Schoebel-Theuer --- drivers/staging/mars/xio_bricks/xio_net.c | 1849 +++++++++++++++++++++++++++++ include/linux/xio/xio_net.h | 177 +++ 2 files changed, 2026 insertions(+) create mode 100644 drivers/staging/mars/xio_bricks/xio_net.c create mode 100644 include/linux/xio/xio_net.h diff --git a/drivers/staging/mars/xio_bricks/xio_net.c b/drivers/staging/mars/xio_bricks/xio_net.c new file mode 100644 index 000000000000..441eee1f3912 --- /dev/null +++ b/drivers/staging/mars/xio_bricks/xio_net.c @@ -0,0 +1,1849 @@ +/* + * MARS Long Distance Replication Software + * + * Copyright (C) 2010-2014 Thomas Schoebel-Theuer + * Copyright (C) 2011-2014 1&1 Internet AG + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include + +/******************************************************************/ + +/* provisionary version detection */ + +#ifndef TCP_MAX_REORDERING +#define __HAS_IOV_ITER +#endif + +#ifdef sk_net_refcnt +/* see eeb1bd5c40edb0e2fd925c8535e2fdebdbc5cef2 */ +#define __HAS_STRUCT_NET +#endif + +/******************************************************************/ + +#define USE_BUFFERING + +#define SEND_PROTO_VERSION 2 + +enum COMPRESS_TYPES { + COMPRESS_NONE = 0, + COMPRESS_LZO = 1, + /* insert further methods here */ +}; + +int xio_net_compress_data; + +const u16 net_global_flags = 0 +#ifdef __HAVE_LZO + | COMPRESS_LZO +#endif + ; + +/******************************************************************/ + +/* Internal data structures for low-level transfer of C structures + * described by struct meta. + * Only these low-level fields need to have a fixed size like s64. + * The size and bytesex of the higher-level C structures is converted + * automatically; therefore classical "int" or "long long" etc is viable. + */ + +#define MAX_FIELD_LEN (32 + 16) + +/* Please keep this at a size of 64 bytes by + * reuse of *spare* fields. + */ +struct xio_desc_cache { + u8 cache_sender_proto; + u8 cache_recver_proto; + s8 cache_is_bigendian; + u8 cache_spare0; + s16 cache_items; + u16 cache_spare1; + u32 cache_spare2; + u32 cache_spare3; + u64 cache_spare4[4]; + u64 cache_sender_cookie; + u64 cache_recver_cookie; +}; + +/* Please keep this also at a size of 64 bytes by + * reuse of *spare* fields. + */ +struct xio_desc_item { + s8 field_type; + s8 field_spare0; + s16 field_data_size; + s16 field_sender_size; + s16 field_sender_offset; + s16 field_recver_size; + s16 field_recver_offset; + s32 field_spare; + char field_name[MAX_FIELD_LEN]; +}; + +/* This must not be mirror symmetric between big and little endian + */ +#define XIO_DESC_MAGIC 0x73D0A2EC6148F48Ell + +struct xio_desc_header { + u64 h_magic; + u64 h_cookie; + s16 h_meta_len; + s16 h_index; + u32 h_spare1; + u64 h_spare2; +}; + +#define MAX_INT_TRANSFER 16 + +/******************************************************************/ + +/* Bytesex conversion / sign extension + */ + +#ifdef __LITTLE_ENDIAN +static const bool myself_is_bigendian; + +#endif +#ifdef __BIG_ENDIAN +static const bool myself_is_bigendian = true; + +#endif + +static inline +void swap_bytes(void *data, int len) +{ + char *a = data; + char *b = data + len - 1; + + while (a < b) { + char tmp = *a; + + *a = *b; + *b = tmp; + a++; + b--; + } +} + +#define SWAP_FIELD(x) swap_bytes(&(x), sizeof(x)) + +static inline +void swap_mc(struct xio_desc_cache *mc, int len) +{ + struct xio_desc_item *mi; + + SWAP_FIELD(mc->cache_sender_cookie); + SWAP_FIELD(mc->cache_recver_cookie); + SWAP_FIELD(mc->cache_items); + + len -= sizeof(*mc); + + for (mi = (void *)(mc + 1); len > 0; mi++, len -= sizeof(*mi)) { + SWAP_FIELD(mi->field_data_size); + SWAP_FIELD(mi->field_sender_size); + SWAP_FIELD(mi->field_sender_offset); + SWAP_FIELD(mi->field_recver_size); + SWAP_FIELD(mi->field_recver_offset); + } +} + +static inline +char get_sign(const void *data, int len, bool is_bigendian, bool is_signed) +{ + if (is_signed) { + char x = is_bigendian ? + ((const char *)data)[0] : + ((const char *)data)[len - 1]; + if (x < 0) + return -1; + } + return 0; +} + +/******************************************************************/ + +/* Low-level network traffic + */ + +int xio_net_default_port = CONFIG_MARS_DEFAULT_PORT; + +module_param_named(xio_port, xio_net_default_port, int, 0); + +int xio_net_bind_before_listen = 1; + +module_param_named(xio_net_bind_before_listen, xio_net_bind_before_listen, int, 0); + +int xio_net_bind_before_connect = 1; + +/* TODO: add authentication. + * TODO: add encryption. + */ + +struct xio_tcp_params repl_tcp_params = { + .ip_tos = IPTOS_LOWDELAY, + .tcp_window_size = 8 * 1024 * 1024, /* for long distance replications */ + .tcp_nodelay = 0, + .tcp_timeout = 2, + .tcp_keepcnt = 3, + .tcp_keepintvl = 3, /* keepalive ping time */ + .tcp_keepidle = 4, +}; + +struct xio_tcp_params device_tcp_params = { + .ip_tos = IPTOS_LOWDELAY, + .tcp_window_size = 2 * 1024 * 1024, + .tcp_nodelay = 1, + .tcp_timeout = 2, + .tcp_keepcnt = 3, + .tcp_keepintvl = 3, /* keepalive ping time */ + .tcp_keepidle = 4, +}; + +static char *id; + +char *my_id(void) +{ + struct new_utsname *u; + + if (!id) { + /* down_read(&uts_sem); // FIXME: this is currenty not EXPORTed from the kernel! */ + u = utsname(); + if (u) + id = brick_strdup(u->nodename); + /* up_read(&uts_sem); */ + } + return id; +} + +static +void __setsockopt(struct socket *sock, int level, int optname, char *optval, int optsize) +{ + int status = kernel_setsockopt(sock, level, optname, optval, optsize); + + if (status < 0) { + XIO_WRN( + "cannot set %d socket option %d to value %d, status = %d\n", + level, optname, *(int *)optval, status); + } +} + +#define _setsockopt(sock, level, optname, val) __setsockopt(sock, level, optname, (char *)&(val), sizeof(val)) + +int xio_create_sockaddr(struct sockaddr_storage *addr, const char *spec) +{ + struct sockaddr_in *sockaddr = (void *)addr; + const char *new_spec; + const char *tmp_spec; + int status = 0; + + memset(addr, 0, sizeof(*addr)); + sockaddr->sin_family = AF_INET; + sockaddr->sin_port = htons(xio_net_default_port); + + /* Try to translate hostnames to IPs if possible. + */ + if (xio_translate_hostname) + new_spec = xio_translate_hostname(spec); + else + new_spec = brick_strdup(spec); + tmp_spec = new_spec; + + /* This is PROVISIONARY! + * TODO: add IPV6 syntax and many more features :) + */ + if (!*tmp_spec) + goto done; + if (*tmp_spec != ':') { + unsigned char u0 = 0, u1 = 0, u2 = 0, u3 = 0; + + status = sscanf(tmp_spec, "%hhu.%hhu.%hhu.%hhu", &u0, &u1, &u2, &u3); + if (status != 4) { + XIO_ERR("invalid sockaddr IP syntax '%s', status = %d\n", tmp_spec, status); + status = -EINVAL; + goto done; + } + XIO_DBG("decoded IP = %u.%u.%u.%u\n", u0, u1, u2, u3); + sockaddr->sin_addr.s_addr = (__be32)u0 | (__be32)u1 << 8 | (__be32)u2 << 16 | (__be32)u3 << 24; + } + /* deocde port number (when present) */ + tmp_spec = spec; + while (*tmp_spec && *tmp_spec++ != ':') + ; /* empty */ + if (*tmp_spec) { + int port = 0; + + status = kstrtoint(tmp_spec, 10, &port); + if (unlikely(status)) { + XIO_ERR("invalid sockaddr PORT syntax '%s', status = %d\n", tmp_spec, status); + status = -EINVAL; + goto done; + } + XIO_DBG("decoded PORT = %d\n", port); + sockaddr->sin_port = htons(port); + } + status = 0; +done: + brick_string_free(new_spec); + return status; +} + +static int current_debug_nr; /* no locking, just for debugging */ + +static +void _set_socketopts(struct socket *sock, struct xio_tcp_params *params) +{ + struct timeval t = { + .tv_sec = params->tcp_timeout, + }; + int x_true = 1; + + /* TODO: improve this by a table-driven approach + */ + sock->sk->sk_sndtimeo = params->tcp_timeout * HZ; + sock->sk->sk_rcvtimeo = params->tcp_timeout * HZ; + sock->sk->sk_reuse = 1; + _setsockopt(sock, SOL_SOCKET, SO_SNDBUFFORCE, params->tcp_window_size); + _setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, params->tcp_window_size); + _setsockopt(sock, SOL_IP, SO_PRIORITY, params->ip_tos); + _setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, params->tcp_nodelay); + _setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, x_true); + _setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, params->tcp_keepcnt); + _setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, params->tcp_keepintvl); + _setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, params->tcp_keepidle); + _setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, t); + _setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, t); + + if (sock->file) { /* switch back to blocking mode */ + sock->file->f_flags &= ~O_NONBLOCK; + } +} + +static int _xio_send_raw(struct xio_socket *msock, const void *buf, int len, int flags); +static int _xio_recv_raw(struct xio_socket *msock, void *buf, int minlen, int maxlen, int flags); + +static +void xio_proto_check(struct xio_socket *msock) +{ + u8 service_version = 0; + u16 service_flags = 0; + int status; + + status = _xio_recv_raw(msock, &service_version, 1, 1, 0); + if (unlikely(status < 0)) { + XIO_DBG( + "#%d protocol exchange failed at receiving, status = %d\n", + msock->s_debug_nr, + status); + goto out_return; + } + + /* take the the minimum of both protocol versions */ + if (service_version > msock->s_send_proto) + service_version = msock->s_send_proto; + msock->s_send_proto = service_version; + + status = _xio_recv_raw(msock, &service_flags, 2, 2, 0); + if (unlikely(status < 0)) { + XIO_DBG( + "#%d protocol exchange failed at receiving, status = %d\n", + msock->s_debug_nr, + status); + goto out_return; + } + + msock->s_recv_flags = service_flags; +out_return:; +} + +static +int xio_proto_exchange(struct xio_socket *msock, const char *msg) +{ + int status; + + msock->s_send_proto = SEND_PROTO_VERSION; + status = xio_send_raw(msock, &msock->s_send_proto, 1, false); + if (unlikely(status < 0)) { + XIO_DBG( + "#%d protocol exchange on %s failed at sending, status = %d\n", + msock->s_debug_nr, + msg, + status); + goto done; + } + + msock->s_send_flags = net_global_flags; + status = xio_send_raw(msock, &msock->s_send_flags, 2, false); + if (unlikely(status < 0)) { + XIO_DBG( + "#%d flags exchange on %s failed at sending, status = %d\n", + msock->s_debug_nr, + msg, + status); + goto done; + } + +done: + return status; +} + +int xio_create_socket( +struct xio_socket *msock, +struct sockaddr_storage *src_addr, +struct sockaddr_storage *dst_addr, +struct xio_tcp_params *params) +{ + struct socket *sock; + struct sockaddr *src_sockaddr = (void *)src_addr; + struct sockaddr *dst_sockaddr = (void *)dst_addr; + int status = -EEXIST; + + if (unlikely(atomic_read(&msock->s_count))) { + XIO_ERR("#%d socket already in use\n", msock->s_debug_nr); + goto final; + } + if (unlikely(msock->s_socket)) { + XIO_ERR("#%d socket already open\n", msock->s_debug_nr); + goto final; + } + atomic_set(&msock->s_count, 1); + +#ifdef __HAS_STRUCT_NET + status = sock_create_kern(&init_net, AF_INET, SOCK_STREAM, IPPROTO_TCP, &msock->s_socket); +#else + status = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &msock->s_socket); +#endif + if (unlikely(status < 0 || !msock->s_socket)) { + msock->s_socket = NULL; + XIO_WRN("cannot create socket, status = %d\n", status); + goto final; + } + msock->s_debug_nr = ++current_debug_nr; + sock = msock->s_socket; + CHECK_PTR(sock, done); + msock->s_alive = true; + + _set_socketopts(sock, params); + + if (!dst_sockaddr) { /* we are server */ + struct sockaddr_in bind_addr; + + if (unlikely(!src_sockaddr)) { + XIO_ERR("no srcaddr given for bind()\n"); + status = -EINVAL; + goto done; + } + + memcpy(&bind_addr, src_sockaddr, sizeof(bind_addr)); + if (!xio_net_bind_before_listen) + memset(&bind_addr.sin_addr, 0, sizeof(bind_addr.sin_addr)); + + status = kernel_bind(sock, (struct sockaddr *)&bind_addr, sizeof(bind_addr)); + if (unlikely(status < 0)) { + XIO_WRN("#%d bind failed, status = %d\n", msock->s_debug_nr, status); + goto done; + } + status = kernel_listen(sock, 16); + if (status < 0) + XIO_WRN("#%d listen failed, status = %d\n", msock->s_debug_nr, status); + } else { + /* When both src and dst are given, explicitly bind local address. + * Needed for multihomed hosts. + */ + if (src_sockaddr && xio_net_bind_before_connect) { + struct sockaddr_in bind_addr; + + memcpy(&bind_addr, src_sockaddr, sizeof(bind_addr)); + bind_addr.sin_port = 0; + + status = kernel_bind(sock, (struct sockaddr *)&bind_addr, sizeof(struct sockaddr)); + if (unlikely(status < 0)) { + XIO_WRN( + "#%d bind before connect failed, ignored, status = %d\n", + msock->s_debug_nr, status); + } + } + + status = kernel_connect(sock, dst_sockaddr, sizeof(*dst_sockaddr), 0); + /* Treat non-blocking connects as successful. + * Any potential errors will show up later during traffic. + */ + if (status == -EINPROGRESS) { + XIO_DBG("#%d connect in progress\n", msock->s_debug_nr); + status = 0; + } + if (unlikely(status < 0)) { + XIO_DBG("#%d connect failed, status = %d\n", msock->s_debug_nr, status); + goto done; + } + status = xio_proto_exchange(msock, "connect"); + } + +done: + if (status < 0) + xio_put_socket(msock); + else + XIO_DBG("successfully created socket #%d\n", msock->s_debug_nr); +final: + return status; +} + +int xio_accept_socket(struct xio_socket *new_msock, struct xio_socket *old_msock, struct xio_tcp_params *params) +{ + int status = -ENOENT; + struct socket *new_socket = NULL; + bool ok; + + ok = xio_get_socket(old_msock); + if (likely(ok)) { + struct socket *sock = old_msock->s_socket; + + if (unlikely(!sock)) + goto err; + + status = kernel_accept(sock, &new_socket, O_NONBLOCK); + if (unlikely(status < 0)) + goto err; + if (unlikely(!new_socket)) { + status = -EBADF; + goto err; + } + + _set_socketopts(new_socket, params); + + memset(new_msock, 0, sizeof(struct xio_socket)); + new_msock->s_socket = new_socket; + atomic_set(&new_msock->s_count, 1); + new_msock->s_alive = true; + new_msock->s_debug_nr = ++current_debug_nr; + XIO_DBG("#%d successfully accepted socket #%d\n", old_msock->s_debug_nr, new_msock->s_debug_nr); + + status = xio_proto_exchange(new_msock, "accept"); +err: + xio_put_socket(old_msock); + } + return status; +} + +bool xio_get_socket(struct xio_socket *msock) +{ + if (unlikely(atomic_read(&msock->s_count) <= 0)) { + XIO_ERR("#%d bad nesting on msock = %p\n", msock->s_debug_nr, msock); + return false; + } + + atomic_inc(&msock->s_count); + + if (unlikely(!msock->s_socket || !msock->s_alive)) { + xio_put_socket(msock); + return false; + } + return true; +} + +void xio_put_socket(struct xio_socket *msock) +{ + if (unlikely(atomic_read(&msock->s_count) <= 0)) { + XIO_ERR("#%d bad nesting on msock = %p sock = %p\n", msock->s_debug_nr, msock, msock->s_socket); + } else if (atomic_dec_and_test(&msock->s_count)) { + struct socket *sock = msock->s_socket; + int i; + + XIO_DBG("#%d closing socket %p\n", msock->s_debug_nr, sock); + if (likely(sock && cmpxchg(&msock->s_alive, true, false))) + kernel_sock_shutdown(sock, SHUT_RDWR); + if (likely(sock && !msock->s_alive)) { + XIO_DBG("#%d releasing socket %p\n", msock->s_debug_nr, sock); + sock_release(sock); + } + for (i = 0; i < MAX_DESC_CACHE; i++) { + if (msock->s_desc_send[i]) + brick_block_free(msock->s_desc_send[i], PAGE_SIZE); + if (msock->s_desc_recv[i]) + brick_block_free(msock->s_desc_recv[i], PAGE_SIZE); + } + brick_block_free(msock->s_buffer, PAGE_SIZE); + memset(msock, 0, sizeof(struct xio_socket)); + } +} + +void xio_shutdown_socket(struct xio_socket *msock) +{ + if (msock->s_socket) { + bool ok = xio_get_socket(msock); + + if (likely(ok)) { + struct socket *sock = msock->s_socket; + + if (likely(sock && cmpxchg(&msock->s_alive, true, false))) { + XIO_DBG("#%d shutdown socket %p\n", msock->s_debug_nr, sock); + kernel_sock_shutdown(sock, SHUT_RDWR); + } + xio_put_socket(msock); + } + } +} + +bool xio_socket_is_alive(struct xio_socket *msock) +{ + bool res = false; + + if (!msock->s_socket || !msock->s_alive) + goto done; + if (unlikely(atomic_read(&msock->s_count) <= 0)) { + XIO_ERR("#%d bad nesting on msock = %p sock = %p\n", msock->s_debug_nr, msock, msock->s_socket); + goto done; + } + res = true; +done: + return res; +} + +long xio_socket_send_space_available(struct xio_socket *msock) +{ + struct socket *raw_sock = msock->s_socket; + long res = 0; + + if (!msock->s_alive || !raw_sock || !raw_sock->sk) + goto done; + if (unlikely(atomic_read(&msock->s_count) <= 0)) { + XIO_ERR("#%d bad nesting on msock = %p sock = %p\n", msock->s_debug_nr, msock, msock->s_socket); + goto done; + } + + res = raw_sock->sk->sk_sndbuf - raw_sock->sk->sk_wmem_queued; + if (res < 0) + res = 0; + res += msock->s_pos; + +done: + return res; +} + +static +int _xio_send_raw(struct xio_socket *msock, const void *buf, int len, int flags) +{ + int sleeptime = 1000 / HZ; + int sent = 0; + int status = 0; + + msock->s_send_cnt = 0; + while (len > 0) { + int this_len = len; + struct socket *sock = msock->s_socket; + + if (unlikely(!sock || !xio_net_is_alive || brick_thread_should_stop())) { + XIO_WRN("interrupting, sent = %d\n", sent); + status = -EIDRM; + break; + } + + { + struct kvec iov = { + .iov_base = (void *)buf, + .iov_len = this_len, + }; + struct msghdr msg = { +#ifndef __HAS_IOV_ITER + .msg_iov = (struct iovec *)&iov, +#endif + .msg_flags = 0 | MSG_NOSIGNAL, + }; + status = kernel_sendmsg(sock, &msg, &iov, 1, this_len); + } + + if (status == -EAGAIN) { + if (msock->s_send_abort > 0 && ++msock->s_send_cnt > msock->s_send_abort) { + XIO_WRN("#%d reached send abort %d\n", msock->s_debug_nr, msock->s_send_abort); + status = -EINTR; + break; + } + brick_msleep(sleeptime); + /* linearly increasing backoff */ + if (sleeptime < 100) + sleeptime += 1000 / HZ; + continue; + } + msock->s_send_cnt = 0; + if (unlikely(status == -EINTR)) { /* ignore it */ + flush_signals(current); + brick_msleep(50); + continue; + } + if (unlikely(!status)) { + XIO_WRN("#%d EOF from socket upon send_page()\n", msock->s_debug_nr); + brick_msleep(50); + status = -ECOMM; + break; + } + if (unlikely(status < 0)) { + XIO_WRN( + "#%d bad socket sendmsg, len=%d, this_len=%d, sent=%d, status = %d\n", + msock->s_debug_nr, + len, + this_len, + sent, + status); + break; + } + + len -= status; + buf += status; + sent += status; + sleeptime = 1000 / HZ; + } + + msock->s_send_bytes += sent; + if (status >= 0) + status = sent; + + return status; +} + +int xio_send_raw(struct xio_socket *msock, const void *buf, int len, bool cork) +{ +#ifdef USE_BUFFERING + int sent = 0; + int rest = len; + +#endif + int status = -EINVAL; + + if (!xio_get_socket(msock)) + goto final; + +#ifdef USE_BUFFERING +restart: + if (!msock->s_buffer) { + msock->s_pos = 0; + msock->s_buffer = brick_block_alloc(0, PAGE_SIZE); + } + + if (msock->s_pos + rest < PAGE_SIZE) { + memcpy(msock->s_buffer + msock->s_pos, buf, rest); + msock->s_pos += rest; + sent += rest; + rest = 0; + status = sent; + if (cork) + goto done; + } + + if (msock->s_pos > 0) { + status = _xio_send_raw(msock, msock->s_buffer, msock->s_pos, 0); + if (status < 0) + goto done; + + brick_block_free(msock->s_buffer, PAGE_SIZE); + msock->s_buffer = NULL; + msock->s_pos = 0; + } + + if (rest >= PAGE_SIZE) { + status = _xio_send_raw(msock, buf, rest, 0); + goto done; + } else if (rest > 0) { + goto restart; + } + status = sent; + +done: +#else + status = _xio_send_raw(msock, buf, len, 0); +#endif + if (status < 0 && msock->s_shutdown_on_err) + xio_shutdown_socket(msock); + + xio_put_socket(msock); + +final: + return status; +} + +/** + * xio_recv_raw() - Get [min, max] number of bytes + * @msock: socket to read from + * @buf: buffer to put the data in + * @minlen: minimum number of bytes to read + * @maxlen: maximum number of bytes to read + * + * Returns a negative error code or a number between [@minlen, @maxlen]. + * Short reads are mapped to an error. + * + * Hint: by setting @minlen to 1, you can read any number up to @maxlen. + * However, the most important use case is @minlen == @maxlen. + * + * Note: buf may be NULL. In this case, the data is simply consumed, + * like /dev/null + */ +static +int _xio_recv_raw(struct xio_socket *msock, void *buf, int minlen, int maxlen, int flags) +{ + void *dummy = NULL; + int sleeptime = 1000 / HZ; + int status = -EIDRM; + int done = 0; + + if (!buf) { + buf = brick_block_alloc(0, maxlen); + dummy = buf; + } + + if (!xio_get_socket(msock)) + goto final; + + if (minlen < maxlen) { + struct socket *sock = msock->s_socket; + + if (sock && sock->file) { + /* Use nonblocking reads to consume as much data + * as possible + */ + sock->file->f_flags |= O_NONBLOCK; + } + } + + msock->s_recv_cnt = 0; + while (done < minlen || (!minlen && !done)) { + struct kvec iov = { + .iov_base = buf + done, + .iov_len = maxlen - done, + }; + struct msghdr msg = { +#ifndef __HAS_IOV_ITER + .msg_iovlen = 1, + .msg_iov = (struct iovec *)&iov, + .msg_flags = flags | MSG_NOSIGNAL, +#endif + }; + struct socket *sock = msock->s_socket; + + if (unlikely(!sock)) { + XIO_WRN("#%d socket has disappeared\n", msock->s_debug_nr); + status = -EIDRM; + goto err; + } + + if (!xio_net_is_alive || brick_thread_should_stop()) { + XIO_WRN("#%d interrupting, done = %d\n", msock->s_debug_nr, done); + status = -EIDRM; + goto err; + } + + status = kernel_recvmsg(sock, &msg, &iov, 1, maxlen - done, msg.msg_flags); + + if (!xio_net_is_alive || brick_thread_should_stop()) { + XIO_WRN("#%d interrupting, done = %d\n", msock->s_debug_nr, done); + status = -EIDRM; + goto err; + } + + if (status == -EAGAIN) { + if (msock->s_recv_abort > 0 && ++msock->s_recv_cnt > msock->s_recv_abort) { + XIO_WRN("#%d reached recv abort %d\n", msock->s_debug_nr, msock->s_recv_abort); + status = -EINTR; + goto err; + } + brick_msleep(sleeptime); + if (minlen <= 0) + break; + /* linearly increasing backoff */ + if (sleeptime < 100) + sleeptime += 1000 / HZ; + continue; + } + msock->s_recv_cnt = 0; + if (!status) { /* EOF */ + XIO_WRN( + "#%d got EOF from socket (done=%d, req_size=%d)\n", msock->s_debug_nr, done, maxlen - done); + status = -EPIPE; + goto err; + } + if (status < 0) { + XIO_WRN("#%d bad recvmsg, status = %d\n", msock->s_debug_nr, status); + goto err; + } + done += status; + sleeptime = 1000 / HZ; + } + status = done; + msock->s_recv_bytes += done; + +err: + if (status < 0 && msock->s_shutdown_on_err) + xio_shutdown_socket(msock); + xio_put_socket(msock); +final: + if (dummy) + brick_block_free(dummy, maxlen); + return status; +} + +int xio_recv_raw(struct xio_socket *msock, void *buf, int minlen, int maxlen) +{ + /* Check the very first received byte for higher-level protocol + * information. This safes one ping-pong cycle at + * xio_proto_exchange() because the sender can immediately + * start sending bulk data without need to wait there. + * This is important for latency, thus we exceptionally break + * the layering hierarchy here. Also, we start sending at + * the lowest possible protocol version and may increase + * the protocol capabilities dynamically at runtime, + * somewhen later. This bears some slight nondeterminism, + * but we take it into account for performance reasons. + */ + if (unlikely(!msock->s_recv_bytes)) + xio_proto_check(msock); + + return _xio_recv_raw(msock, buf, minlen, maxlen, 0); +} + +int xio_send_compressed(struct xio_socket *msock, const void *buf, s32 len, int compress, bool cork) +{ + void *compr_data = NULL; + + s16 compr_code = 0; + int status; + + switch (compress) { + case COMPRESS_LZO: +#ifdef __HAVE_LZO + /* tolerate mixes of different proto versions */ + if (msock->s_send_proto >= 2 && (msock->s_recv_flags & COMPRESS_LZO)) { + size_t compr_len = 0; + int lzo_status; + void *wrkmem; + + compr_data = brick_mem_alloc(lzo1x_worst_compress(len)); + wrkmem = brick_mem_alloc(LZO1X_1_MEM_COMPRESS); + + lzo_status = lzo1x_1_compress(buf, len, compr_data, &compr_len, wrkmem); + + brick_mem_free(wrkmem); + if (likely(lzo_status == LZO_E_OK && compr_len < len)) { + compr_code = COMPRESS_LZO; + buf = compr_data; + len = compr_len; + } + } +#endif + break; + + /* implement further methods here */ + + default: + /* ignore unknown compress codes */ + break; + } + + /* allow mixing of different proto versions */ + if (likely(msock->s_send_proto >= 2)) { + status = xio_send_raw(msock, &compr_code, sizeof(compr_code), true); + if (unlikely(status < 0)) + goto done; + if (compr_code > 0) { + status = xio_send_raw(msock, &len, sizeof(len), true); + if (unlikely(status < 0)) + goto done; + } + } + + status = xio_send_raw(msock, buf, len, cork); + +done: + brick_mem_free(compr_data); + return status; +} + +int xio_recv_compressed(struct xio_socket *msock, void *buf, int minlen, int maxlen) +{ + void *compr_data = NULL; + + s16 compr_code = COMPRESS_NONE; + int status; + + /* allow mixing of different proto versions */ + if (msock->s_send_proto >= 2) { + status = xio_recv_raw(msock, &compr_code, sizeof(compr_code), sizeof(compr_code)); + if (unlikely(status < 0)) + goto done; + } + + switch (compr_code) { + case COMPRESS_NONE: + status = xio_recv_raw(msock, buf, minlen, maxlen); + break; + + case COMPRESS_LZO: +#ifdef __HAVE_LZO + { + s32 compr_len = 0; + size_t this_len; + int lzo_status; + + status = xio_recv_raw(msock, &compr_len, sizeof(compr_len), sizeof(compr_len)); + if (unlikely(status < 0)) + goto done; + if (unlikely(compr_len <= 0 || compr_len >= maxlen)) { + XIO_ERR( + "bad comp_len = %d, real minlen = %d maxlen = %d\n", + compr_len, minlen, maxlen); + status = -EOVERFLOW; + goto done; + } + + compr_data = brick_mem_alloc(compr_len); + + status = xio_recv_raw(msock, compr_data, compr_len, compr_len); + if (unlikely(status < 0)) + goto done; + + this_len = maxlen; + lzo_status = lzo1x_decompress_safe(compr_data, compr_len, buf, &this_len); + + status = this_len; + if (unlikely(lzo_status != LZO_E_OK)) { + XIO_ERR("bad decompression, lzo_status = %d\n", lzo_status); + status = -EBADE; + goto done; + } + if (unlikely(this_len < minlen || this_len > maxlen)) { + XIO_WRN( + "bad decompression length this_len = %ld, minlen = %d maxlen = %d\n", ( + long)this_len, minlen, maxlen); + status = -EBADMSG; + goto done; + } + break; + } +#else + XIO_WRN("cannot LZO decompress\n"); + status = -EBADMSG; + break; +#endif + + /* implement further methods here */ + + default: + XIO_WRN("got unknown compr_code = %d\n", compr_code); + status = -EBADRQC; + } + +done: + brick_mem_free(compr_data); + return status; +} + +/*********************************************************************/ + +/* Mid-level field data exchange + */ + +static +void dump_meta(const struct meta *meta) +{ + int count = 0; + + for (; meta->field_name; meta++) { + XIO_ERR( + "%2d %4d %4d %4d %p '%s'\n", + meta->field_type, + meta->field_data_size, + meta->field_transfer_size, + meta->field_offset, + meta->field_ref, + meta->field_name); + count++; + } + XIO_ERR("-------- %d fields.\n", count); +} + +static +int _add_fields(struct xio_desc_item *mi, const struct meta *meta, int offset, const char *prefix, int maxlen) +{ + int count = 0; + + for (; meta->field_name; meta++) { + const char *new_prefix; + int new_offset; + int len; + + short this_size; + + new_prefix = mi->field_name; + new_offset = offset + meta->field_offset; + + if (unlikely(maxlen < sizeof(struct xio_desc_item))) { + XIO_ERR("desc cache item overflow\n"); + count = -1; + goto done; + } + + len = scnprintf(mi->field_name, MAX_FIELD_LEN, "%s.%s", prefix, meta->field_name); + if (unlikely(len >= MAX_FIELD_LEN)) { + XIO_ERR("field len overflow on '%s.%s'\n", prefix, meta->field_name); + count = -1; + goto done; + } + mi->field_type = meta->field_type; + this_size = meta->field_data_size; + mi->field_data_size = this_size; + mi->field_sender_size = this_size; + this_size = meta->field_transfer_size; + if (this_size > 0) + mi->field_sender_size = this_size; + mi->field_sender_offset = new_offset; + mi->field_recver_offset = -1; + + mi++; + maxlen -= sizeof(struct xio_desc_item); + count++; + + if (meta->field_type == FIELD_SUB) { + int sub_count; + + sub_count = _add_fields(mi, meta->field_ref, new_offset, new_prefix, maxlen); + if (sub_count < 0) + return sub_count; + + mi += sub_count; + count += sub_count; + maxlen -= sub_count * sizeof(struct xio_desc_item); + } + } +done: + return count; +} + +static +struct xio_desc_cache *make_sender_cache(struct xio_socket *msock, const struct meta *meta, int *cache_index) +{ + int orig_len = PAGE_SIZE; + int maxlen = orig_len; + struct xio_desc_cache *mc; + struct xio_desc_item *mi; + int i; + int status; + + for (i = 0; i < MAX_DESC_CACHE; i++) { + mc = msock->s_desc_send[i]; + if (!mc) + break; + if (mc->cache_sender_cookie == (u64)meta) + goto done; + } + + if (unlikely(i >= MAX_DESC_CACHE - 1)) { + XIO_ERR("#%d desc cache overflow\n", msock->s_debug_nr); + return NULL; + } + + mc = brick_block_alloc(0, maxlen); + + memset(mc, 0, maxlen); + mc->cache_sender_cookie = (u64)meta; + /* further bits may be used in future */ + mc->cache_sender_proto = msock->s_send_proto; + mc->cache_recver_proto = msock->s_recv_proto; + + maxlen -= sizeof(struct xio_desc_cache); + mi = (void *)(mc + 1); + + status = _add_fields(mi, meta, 0, "", maxlen); + + if (likely(status > 0)) { + mc->cache_items = status; + mc->cache_is_bigendian = myself_is_bigendian; + msock->s_desc_send[i] = mc; + *cache_index = i; + } else { + brick_block_free(mc, orig_len); + mc = NULL; + } + +done: + return mc; +} + +static +int _make_recver_cache(struct xio_desc_cache *mc, const struct meta *meta, int offset, const char *prefix) +{ + char *tmp = brick_string_alloc(MAX_FIELD_LEN); + int count = 0; + int i; + + for (; meta->field_name; meta++, count++) { + snprintf(tmp, MAX_FIELD_LEN, "%s.%s", prefix, meta->field_name); + for (i = 0; i < mc->cache_items; i++) { + struct xio_desc_item *mi = ((struct xio_desc_item *)(mc + 1)) + i; + + if (meta->field_type == mi->field_type && + !strcmp(tmp, mi->field_name)) { + mi->field_recver_size = meta->field_data_size; + mi->field_recver_offset = offset + meta->field_offset; + if (meta->field_type == FIELD_SUB) { + int sub_count = _make_recver_cache( + + mc, meta->field_ref, mi->field_recver_offset, tmp); + if (unlikely(sub_count <= 0)) { + count = 0; + goto done; + } + } + goto found; + } + } + if (unlikely(!count)) { + XIO_ERR("field '%s' is missing\n", meta->field_name); + goto done; + } + XIO_WRN("field %2d '%s' is missing\n", count, meta->field_name); +found:; + } +done: + brick_string_free(tmp); + return count; +} + +static +int make_recver_cache(struct xio_desc_cache *mc, const struct meta *meta) +{ + int count; + int i; + + mc->cache_recver_cookie = (u64)meta; + count = _make_recver_cache(mc, meta, 0, ""); + + for (i = 0; i < mc->cache_items; i++) { + struct xio_desc_item *mi = ((struct xio_desc_item *)(mc + 1)) + i; + + if (unlikely(mi->field_recver_offset < 0)) + XIO_WRN("field '%s' is not transferred\n", mi->field_name); + } + return count; +} + +#define _CHECK_STATUS(_txt_) \ +do { \ + if (unlikely(status < 0)) { \ + XIO_DBG("%s status = %d\n", _txt_, status); \ + goto err; \ + } \ +} while (0) + +static +int _desc_send_item( +struct xio_socket *msock, const void *data, const struct xio_desc_cache *mc, int index, bool cork) +{ + struct xio_desc_item *mi = ((struct xio_desc_item *)(mc + 1)) + index; + const void *item = data + mi->field_sender_offset; + + s16 data_len = mi->field_data_size; + s16 transfer_len = mi->field_sender_size; + int status; + bool is_signed = false; + int res = -1; + + switch (mi->field_type) { + case FIELD_REF: + XIO_ERR("field '%s' NYI type = %d\n", mi->field_name, mi->field_type); + goto err; + case FIELD_SUB: + /* skip this */ + res = 0; + break; + case FIELD_INT: + is_signed = true; + /* fallthrough */ + case FIELD_UINT: + if (unlikely(data_len <= 0 || data_len > MAX_INT_TRANSFER)) { + XIO_ERR("field '%s' bad data_len = %d\n", mi->field_name, data_len); + goto err; + } + if (unlikely(transfer_len > MAX_INT_TRANSFER)) { + XIO_ERR("field '%s' bad transfer_len = %d\n", mi->field_name, transfer_len); + goto err; + } + + if (likely(data_len == transfer_len)) + goto raw; + + if (transfer_len > data_len) { + int diff = transfer_len - data_len; + char empty[diff]; + char sign; + + sign = get_sign(item, data_len, myself_is_bigendian, is_signed); + memset(empty, sign, diff); + + if (myself_is_bigendian) { + status = xio_send_raw(msock, empty, diff, true); + _CHECK_STATUS("send_diff"); + status = xio_send_raw(msock, item, data_len, cork); + _CHECK_STATUS("send_item"); + } else { + status = xio_send_raw(msock, item, data_len, true); + _CHECK_STATUS("send_item"); + status = xio_send_raw(msock, empty, diff, cork); + _CHECK_STATUS("send_diff"); + } + + res = data_len; + break; + } else if (unlikely(transfer_len <= 0)) { + XIO_ERR("bad transfer_len = %d\n", transfer_len); + goto err; + } else { /* transfer_len < data_len */ + char check = get_sign(item, data_len, myself_is_bigendian, is_signed); + int start; + int end; + int i; + + if (is_signed && + unlikely(get_sign(item, transfer_len, myself_is_bigendian, true) != check)) { + XIO_ERR( + "cannot sign-reduce signed integer from %d to %d bytes, byte %d !~ %d\n", + data_len, + transfer_len, + ((char *)item)[transfer_len - 1], + check); + goto err; + } + + if (myself_is_bigendian) { + start = 0; + end = data_len - transfer_len; + } else { + start = transfer_len; + end = data_len; + } + + for (i = start; i < end; i++) { + if (unlikely(((char *)item)[i] != check)) { + XIO_ERR( + "cannot sign-reduce %ssigned integer from %d to %d bytes at pos %d, byte %d != %d\n", + is_signed ? "" : "un", + data_len, + transfer_len, + i, + ((char *)item)[i], + check); + goto err; + } + } + + /* just omit the higher/lower bytes */ + data_len = transfer_len; + if (myself_is_bigendian) + item += end; + goto raw; + } + case FIELD_STRING: + item = *(void **)item; + data_len = 0; + if (item) + data_len = strlen(item) + 1; + + status = xio_send_raw(msock, &data_len, sizeof(data_len), true); + _CHECK_STATUS("send_string_len"); + /* fallthrough */ + case FIELD_RAW: +raw: + if (unlikely(data_len < 0)) { + XIO_ERR("field '%s' bad data_len = %d\n", mi->field_name, data_len); + goto err; + } + status = xio_send_raw(msock, item, data_len, cork); + _CHECK_STATUS("send_raw"); + res = data_len; + break; + default: + XIO_ERR("field '%s' unknown type = %d\n", mi->field_name, mi->field_type); + } +err: + return res; +} + +static +int _desc_recv_item(struct xio_socket *msock, void *data, const struct xio_desc_cache *mc, int index, int line) +{ + struct xio_desc_item *mi = ((struct xio_desc_item *)(mc + 1)) + index; + void *item = NULL; + + s16 data_len = mi->field_recver_size; + s16 transfer_len = mi->field_sender_size; + int status; + bool is_signed = false; + int res = -1; + + if (likely(data && data_len > 0 && mi->field_recver_offset >= 0)) + item = data + mi->field_recver_offset; + + switch (mi->field_type) { + case FIELD_REF: + XIO_ERR("field '%s' NYI type = %d\n", mi->field_name, mi->field_type); + goto err; + case FIELD_SUB: + /* skip this */ + res = 0; + break; + case FIELD_INT: + is_signed = true; + /* fallthrough */ + case FIELD_UINT: + if (unlikely(data_len <= 0 || data_len > MAX_INT_TRANSFER)) { + XIO_ERR("field '%s' bad data_len = %d\n", mi->field_name, data_len); + goto err; + } + if (unlikely(transfer_len > MAX_INT_TRANSFER)) { + XIO_ERR("field '%s' bad transfer_len = %d\n", mi->field_name, transfer_len); + goto err; + } + + if (likely(data_len == transfer_len)) + goto raw; + + if (transfer_len > data_len) { + int diff = transfer_len - data_len; + char empty[diff]; + char check; + + memset(empty, 0, diff); + + if (myself_is_bigendian) { + status = xio_recv_raw(msock, empty, diff, diff); + _CHECK_STATUS("recv_diff"); + } + + status = xio_recv_raw(msock, item, data_len, data_len); + _CHECK_STATUS("recv_item"); + if (unlikely(mc->cache_is_bigendian != myself_is_bigendian && item)) + swap_bytes(item, data_len); + + if (!myself_is_bigendian) { + status = xio_recv_raw(msock, empty, diff, diff); + _CHECK_STATUS("recv_diff"); + } + + /* check that sign extension did no harm */ + check = get_sign(empty, diff, mc->cache_is_bigendian, is_signed); + while (--diff >= 0) { + if (unlikely(empty[diff] != check)) { + XIO_ERR( + "field '%s' %sSIGNED INTEGER OVERFLOW on size reduction from %d to %d, byte %d != %d\n", + mi->field_name, + is_signed ? "" : "UN", + transfer_len, + data_len, + empty[diff], + check); + goto err; + } + } + if (is_signed && item && + unlikely(get_sign(item, data_len, myself_is_bigendian, true) != check)) { + XIO_ERR( + "field '%s' SIGNED INTEGER OVERLOW on reduction from size %d to %d, byte %d !~ %d\n", + mi->field_name, + transfer_len, + data_len, + ((char *)item)[data_len - 1], + check); + goto err; + } + + res = data_len; + break; + } else if (unlikely(transfer_len <= 0)) { + XIO_ERR("field '%s' bad transfer_len = %d\n", mi->field_name, transfer_len); + goto err; + } else if (unlikely(!item)) { /* shortcut without checks */ + data_len = transfer_len; + goto raw; + } else { /* transfer_len < data_len */ + int diff = data_len - transfer_len; + char *transfer_ptr = item; + char sign; + + if (myself_is_bigendian) + transfer_ptr += diff; + + status = xio_recv_raw(msock, transfer_ptr, transfer_len, transfer_len); + _CHECK_STATUS("recv_transfer"); + if (unlikely(mc->cache_is_bigendian != myself_is_bigendian)) + swap_bytes(transfer_ptr, transfer_len); + + /* sign-extend from transfer_len to data_len */ + sign = get_sign(transfer_ptr, transfer_len, myself_is_bigendian, is_signed); + if (myself_is_bigendian) + memset(item, sign, diff); + else + memset(item + transfer_len, sign, diff); + res = data_len; + break; + } + case FIELD_STRING: + data_len = 0; + status = xio_recv_raw(msock, &data_len, sizeof(data_len), sizeof(data_len)); + _CHECK_STATUS("recv_string_len"); + + if (unlikely(mc->cache_is_bigendian != myself_is_bigendian)) + swap_bytes(&data_len, sizeof(data_len)); + + if (data_len > 0 && item) { + char *str = _brick_string_alloc(data_len, line); + + *(void **)item = str; + item = str; + } + + transfer_len = data_len; + /* fallthrough */ + case FIELD_RAW: +raw: + if (unlikely(data_len < 0)) { + XIO_ERR("field = '%s' implausible data_len = %d\n", mi->field_name, data_len); + goto err; + } + if (likely(data_len > 0)) { + if (unlikely(transfer_len != data_len)) { + XIO_ERR( + "cannot handle generic mismatch in transfer sizes, field = '%s', %d != %d\n", + mi->field_name, + transfer_len, + data_len); + goto err; + } + status = xio_recv_raw(msock, item, data_len, data_len); + _CHECK_STATUS("recv_raw"); + } + res = data_len; + break; + default: + XIO_ERR("field '%s' unknown type = %d\n", mi->field_name, mi->field_type); + } +err: + return res; +} + +static inline +int _desc_send_struct(struct xio_socket *msock, int cache_index, const void *data, int h_meta_len, bool cork) +{ + const struct xio_desc_cache *mc = msock->s_desc_send[cache_index]; + + struct xio_desc_header header = { + .h_magic = XIO_DESC_MAGIC, + .h_cookie = mc->cache_sender_cookie, + .h_meta_len = h_meta_len, + .h_index = data ? cache_index : -1, + }; + int index; + int count = 0; + int status = 0; + + status = xio_send_raw(msock, &header, sizeof(header), cork || data); + _CHECK_STATUS("send_header"); + + if (unlikely(h_meta_len > 0)) { + status = xio_send_raw(msock, mc, h_meta_len, true); + _CHECK_STATUS("send_meta"); + } + + if (likely(data)) { + for (index = 0; index < mc->cache_items; index++) { + status = _desc_send_item(msock, data, mc, index, cork || index < mc->cache_items - 1); + _CHECK_STATUS("send_cache_item"); + count++; + } + } + + if (status >= 0) + status = count; +err: + return status; +} + +static +int desc_send_struct(struct xio_socket *msock, const void *data, const struct meta *meta, bool cork) +{ + struct xio_desc_cache *mc; + int i; + int h_meta_len = 0; + int status = -EINVAL; + + for (i = 0; i < MAX_DESC_CACHE; i++) { + mc = msock->s_desc_send[i]; + if (!mc) + break; + if (mc->cache_sender_cookie == (u64)meta) + goto found; + } + + mc = make_sender_cache(msock, meta, &i); + if (unlikely(!mc)) + goto done; + + h_meta_len = mc->cache_items * sizeof(struct xio_desc_item) + sizeof(struct xio_desc_cache); + +found: + status = _desc_send_struct(msock, i, data, h_meta_len, cork); + +done: + return status; +} + +static +int desc_recv_struct(struct xio_socket *msock, void *data, const struct meta *meta, int line) +{ + struct xio_desc_header header = {}; + struct xio_desc_cache *mc; + int cache_index; + int index; + int count = 0; + int status = 0; + bool need_swap = false; + + status = xio_recv_raw(msock, &header, sizeof(header), sizeof(header)); + _CHECK_STATUS("recv_header"); + + if (unlikely(header.h_magic != XIO_DESC_MAGIC)) { + need_swap = true; + SWAP_FIELD(header.h_magic); + if (unlikely(header.h_magic != XIO_DESC_MAGIC)) { + XIO_WRN( + "#%d called from line %d bad packet header magic = %llx\n", + msock->s_debug_nr, + line, + header.h_magic); + status = -ENOMSG; + goto err; + } + SWAP_FIELD(header.h_cookie); + SWAP_FIELD(header.h_meta_len); + SWAP_FIELD(header.h_index); + } + + cache_index = header.h_index; + if (cache_index < 0) { /* EOR */ + goto done; + } + if (unlikely(cache_index >= MAX_DESC_CACHE - 1)) { + XIO_WRN("#%d called from line %d bad cache index %d\n", msock->s_debug_nr, line, cache_index); + status = -EBADF; + goto err; + } + + mc = msock->s_desc_recv[cache_index]; + if (unlikely(!mc)) { + if (unlikely(header.h_meta_len <= 0)) { + XIO_WRN("#%d called from line %d missing meta information\n", msock->s_debug_nr, line); + status = -ENOMSG; + goto err; + } + + mc = _brick_block_alloc(0, PAGE_SIZE, line); + + status = xio_recv_raw(msock, mc, header.h_meta_len, header.h_meta_len); + if (unlikely(status < 0)) + brick_block_free(mc, PAGE_SIZE); + _CHECK_STATUS("recv_meta"); + + if (unlikely(need_swap)) + swap_mc(mc, header.h_meta_len); + + status = make_recver_cache(mc, meta); + if (unlikely(status < 0)) { + brick_block_free(mc, PAGE_SIZE); + goto err; + } + msock->s_desc_recv[cache_index] = mc; + } else if (unlikely(header.h_meta_len > 0)) { + XIO_WRN( + "#%d called from line %d has %d unexpected meta bytes\n", msock->s_debug_nr, line, header.h_meta_len); + status = -EMSGSIZE; + goto err; + } else if (unlikely(mc->cache_recver_cookie != (u64)meta)) { + XIO_ERR("#%d protocol error %p != %p\n", msock->s_debug_nr, meta, (void *)mc->cache_recver_cookie); + dump_meta((void *)mc->cache_recver_cookie); + dump_meta(meta); + status = -EPROTO; + goto err; + } + + for (index = 0; index < mc->cache_items; index++) { + status = _desc_recv_item(msock, data, mc, index, line); + _CHECK_STATUS("recv_cache_item"); + count++; + } + +done: + if (status >= 0) + status = count; +err: + return status; +} + +int xio_send_struct(struct xio_socket *msock, const void *data, const struct meta *meta) +{ + return desc_send_struct(msock, data, meta, false); +} + +int _xio_recv_struct(struct xio_socket *msock, void *data, const struct meta *meta, int line) +{ + return desc_recv_struct(msock, data, meta, line); +} + +/*********************************************************************/ + +/* High-level transport of xio structures + */ + +const struct meta xio_cmd_meta[] = { + META_INI_SUB(cmd_stamp, struct xio_cmd, xio_timespec_meta), + META_INI(cmd_code, struct xio_cmd, FIELD_INT), + META_INI(cmd_int1, struct xio_cmd, FIELD_INT), + META_INI(cmd_str1, struct xio_cmd, FIELD_STRING), + {} +}; + +int xio_send_aio(struct xio_socket *msock, struct aio_object *aio) +{ + struct xio_cmd cmd = { + .cmd_code = CMD_AIO, + .cmd_int1 = aio->io_id, + }; + int seq = 0; + int status; + + if (aio->io_rw != 0 && aio->io_data && aio->io_cs_mode < 2) + cmd.cmd_code |= CMD_FLAG_HAS_DATA; + + get_lamport(&cmd.cmd_stamp); + + status = desc_send_struct(msock, &cmd, xio_cmd_meta, true); + if (status < 0) + goto done; + + seq = 0; + status = desc_send_struct(msock, aio, xio_aio_user_meta, cmd.cmd_code & CMD_FLAG_HAS_DATA); + if (status < 0) + goto done; + + if (cmd.cmd_code & CMD_FLAG_HAS_DATA) + status = xio_send_compressed(msock, aio->io_data, aio->io_len, xio_net_compress_data, false); +done: + return status; +} + +int xio_recv_aio(struct xio_socket *msock, struct aio_object *aio, struct xio_cmd *cmd) +{ + int status; + + status = desc_recv_struct(msock, aio, xio_aio_user_meta, __LINE__); + if (status < 0) + goto done; + + set_lamport(&cmd->cmd_stamp); + + if (cmd->cmd_code & CMD_FLAG_HAS_DATA) { + if (!aio->io_data) + aio->io_data = brick_block_alloc(0, aio->io_len); + status = xio_recv_compressed(msock, aio->io_data, aio->io_len, aio->io_len); + if (unlikely(status < 0)) + XIO_WRN("#%d aio_len = %d, status = %d\n", msock->s_debug_nr, aio->io_len, status); + } +done: + return status; +} + +int xio_send_cb(struct xio_socket *msock, struct aio_object *aio) +{ + struct xio_cmd cmd = { + .cmd_code = CMD_CB, + .cmd_int1 = aio->io_id, + }; + int seq = 0; + int status; + + if (aio->io_rw == 0 && aio->io_data && aio->io_cs_mode < 2) + cmd.cmd_code |= CMD_FLAG_HAS_DATA; + + get_lamport(&cmd.cmd_stamp); + + status = desc_send_struct(msock, &cmd, xio_cmd_meta, true); + if (status < 0) + goto done; + + seq = 0; + status = desc_send_struct(msock, aio, xio_aio_user_meta, cmd.cmd_code & CMD_FLAG_HAS_DATA); + if (status < 0) + goto done; + + if (cmd.cmd_code & CMD_FLAG_HAS_DATA) + status = xio_send_compressed(msock, aio->io_data, aio->io_len, xio_net_compress_data, false); +done: + return status; +} + +int xio_recv_cb(struct xio_socket *msock, struct aio_object *aio, struct xio_cmd *cmd) +{ + int status; + + status = desc_recv_struct(msock, aio, xio_aio_user_meta, __LINE__); + if (status < 0) + goto done; + + set_lamport(&cmd->cmd_stamp); + + if (cmd->cmd_code & CMD_FLAG_HAS_DATA) { + if (!aio->io_data) { + XIO_WRN("#%d no internal buffer available\n", msock->s_debug_nr); + status = -EINVAL; + goto done; + } + status = xio_recv_compressed(msock, aio->io_data, aio->io_len, aio->io_len); + } +done: + return status; +} + +/***************** module init stuff ************************/ + +char *(*xio_translate_hostname)(const char *name) = NULL; + +bool xio_net_is_alive; + +int __init init_xio_net(void) +{ + XIO_INF("init_net()\n"); + xio_net_is_alive = true; + return 0; +} + +void exit_xio_net(void) +{ + xio_net_is_alive = false; + brick_string_free(id); + id = NULL; + XIO_INF("exit_net()\n"); +} diff --git a/include/linux/xio/xio_net.h b/include/linux/xio/xio_net.h new file mode 100644 index 000000000000..4c000015863f --- /dev/null +++ b/include/linux/xio/xio_net.h @@ -0,0 +1,177 @@ +/* + * MARS Long Distance Replication Software + * + * Copyright (C) 2010-2014 Thomas Schoebel-Theuer + * Copyright (C) 2011-2014 1&1 Internet AG + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +#ifndef XIO_NET_H +#define XIO_NET_H + +#include +#include +#include + +#include + +extern int xio_net_compress_data; + +extern int xio_net_default_port; +extern int xio_net_bind_before_listen; +extern int xio_net_bind_before_connect; + +extern bool xio_net_is_alive; + +#define MAX_DESC_CACHE 16 + +/* The original struct socket has no refcount. This leads to problems + * during long-lasting system calls when racing with socket shutdown. + * + * The original idea of struct xio_socket was just a small wrapper + * adding a refcount and some debugging aid. + * Later, some buffering was added in order to take advantage of + * kernel_sendpage(). + * Caching of meta description has also been added. + * + * Notice: we have a slightly restricted parallelism model. + * One sender and one receiver thread may work in parallel + * on the same socket instance. At low level, there must not exist + * multiple readers in parallel to each other, or multiple + * writers in parallel to each other. Otherwise, higher level + * protocol sequences would be disturbed anyway. + * When needed, you may achieve higher parallelism by doing your own + * semaphore locking around xio_{send, recv}_struct() or even longer + * sequences of subsets of your high-level protocol. + */ +struct xio_socket { + struct socket *s_socket; + + u64 s_send_bytes; + u64 s_recv_bytes; + void *s_buffer; + atomic_t s_count; + int s_pos; + int s_debug_nr; + int s_send_abort; + int s_recv_abort; + int s_send_cnt; + int s_recv_cnt; + bool s_shutdown_on_err; + bool s_alive; + + u8 s_send_proto; + u8 s_recv_proto; + u16 s_send_flags; + u16 s_recv_flags; + struct xio_desc_cache *s_desc_send[MAX_DESC_CACHE]; + struct xio_desc_cache *s_desc_recv[MAX_DESC_CACHE]; +}; + +struct xio_tcp_params { + int ip_tos; + int tcp_window_size; + int tcp_nodelay; + int tcp_timeout; + int tcp_keepcnt; + int tcp_keepintvl; + int tcp_keepidle; +}; + +extern struct xio_tcp_params repl_tcp_params; +extern struct xio_tcp_params device_tcp_params; + +enum { + CMD_NOP, + CMD_NOTIFY, + CMD_CONNECT, + CMD_GETINFO, + CMD_GETENTS, + CMD_AIO, + CMD_CB, + CMD_CONNECT_LOGGER, +}; + +#define CMD_FLAG_MASK 255 +#define CMD_FLAG_HAS_DATA 256 + +struct xio_cmd { + struct timespec cmd_stamp; /* for automatic lamport clock */ + int cmd_code; + int cmd_int1; + + /* int cmd_int2; */ + /* int cmd_int3; */ + char *cmd_str1; + + /* char *cmd_str2; */ + /* char *cmd_str3; */ +}; + +extern const struct meta xio_cmd_meta[]; + +extern char *(*xio_translate_hostname)(const char *name); + +extern char *my_id(void); + +/* Low-level network traffic + */ +extern int xio_create_sockaddr(struct sockaddr_storage *addr, const char *spec); + +extern int xio_create_socket( +struct xio_socket *msock, +struct sockaddr_storage *src_addr, +struct sockaddr_storage *dst_addr, +struct xio_tcp_params *params); + +extern int xio_accept_socket( +struct xio_socket *new_msock, struct xio_socket *old_msock, struct xio_tcp_params *params); + +extern bool xio_get_socket(struct xio_socket *msock); +extern void xio_put_socket(struct xio_socket *msock); +extern void xio_shutdown_socket(struct xio_socket *msock); +extern bool xio_socket_is_alive(struct xio_socket *msock); +extern long xio_socket_send_space_available(struct xio_socket *msock); + +extern int xio_send_raw(struct xio_socket *msock, const void *buf, int len, bool cork); +extern int xio_recv_raw(struct xio_socket *msock, void *buf, int minlen, int maxlen); + +int xio_send_compressed(struct xio_socket *msock, const void *buf, s32 len, int compress, bool cork); +int xio_recv_compressed(struct xio_socket *msock, void *buf, int minlen, int maxlen); + +/* Mid-level generic field data exchange + */ +extern int xio_send_struct(struct xio_socket *msock, const void *data, const struct meta *meta); +#define xio_recv_struct(_sock_, _data_, _meta_) \ + ({ \ + _xio_recv_struct(_sock_, _data_, _meta_, __LINE__); \ + }) +extern int _xio_recv_struct(struct xio_socket *msock, void *data, const struct meta *meta, int line); + +/* High-level transport of xio structures + */ +extern int xio_send_dent_list(struct xio_socket *msock, struct list_head *anchor); +extern int xio_recv_dent_list(struct xio_socket *msock, struct list_head *anchor); + +extern int xio_send_aio(struct xio_socket *msock, struct aio_object *aio); +extern int xio_recv_aio(struct xio_socket *msock, struct aio_object *aio, struct xio_cmd *cmd); +extern int xio_send_cb(struct xio_socket *msock, struct aio_object *aio); +extern int xio_recv_cb(struct xio_socket *msock, struct aio_object *aio, struct xio_cmd *cmd); + +/***********************************************************************/ + +/* init */ + +extern int init_xio_net(void); +extern void exit_xio_net(void); + +#endif -- 2.11.0