These patches fix various ERTM issues, mostly having to do with
receiver-side handling of flow control (the "local busy" state). This
new approach will not keep very much incoming data beyond what fits in
the socket receive buffer, so memory use is more tightly
controlled. When incoming data does not immediately fit in the socket
buffer, the L2CAP socket layer will now only check for available space
when the socket is read from instead of polling on a workqueue thread.
--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum
Hi Gustavo -
On Thu, 30 Jun 2011, Gustavo F. Padovan wrote:
> * Mat Martineau <[email protected]> [2011-06-29 14:35:19 -0700]:
>
>> Even when the received tx_seq is expected, the frame still needs to be
>> dropped if the TX window is exceeded or the receiver is in the local
>> busy state.
>
> The check doesn't mean that TX window is exceeded, it means that we have an
> invalid tx_seq and connection should be closed. I don't see the point in
> moving the expected check to after this one.
> About the local busy check. Is it worth to drop expected frames when on local
> busy? What are the advantages/disadvantages? Drop will trigger SREJ while
> Store will press the buffer. Do you have measures to prove this?
It's possible for expected_tx_seq to be invalid if the tx window is
full. Therefore it is important to check for an invalid tx_seq before
checking if it is expected.
Dropping frames during local busy is a good idea because:
* Queuing frames during local busy ignores the receive buffer limits
requested by the application with SO_RCVBUF. This problem gets worse
with extended window sizes, where the busy_q could be quite large
* ERTM senders are not supposed to send frames during local busy
anyway
* The spec allows for packets to be dropped in local busy (look for
"StoreOrIgnore")
It's the memory that's most important, though. Dropping all incoming
iframes during local busy is a simple way to keep data buffer size
bounded while still allowing for larger tx windows.
If the application can't keep up with the data rate and is triggering
local busy, throughput isn't going to be improved significatnly by
queuing those frames during local busy.
--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum
Hi Mat,
* Mat Martineau <[email protected]> [2011-06-29 14:35:24 -0700]:
> The ERTM receive buffer is now handled in a way that does not require
> the busy queue and the associated polling code.
This one is fine as a separate patch.
Gustavo
Hi Mat,
* Mat Martineau <[email protected]> [2011-06-29 14:35:23 -0700]:
> This change moves most L2CAP ERTM receive buffer handling out of the
> L2CAP core and in to the socket code. It's up to the higher layer
> (the socket code, in this case) to tell the core when its buffer is
> full or has space available. The recv op should always accept
> incoming ERTM data or else the connection will go down.
>
> Within the socket layer, an skb that does not fit in the socket
> receive buffer will be temporarily stored. When the socket is read
> from, that skb will be placed in the receive buffer if possible. Once
> adequate buffer space becomes available, the L2CAP core is informed
> and the ERTM local busy state is cleared.
>
> Receive buffer management for non-ERTM modes is unchanged.
>
> Signed-off-by: Mat Martineau <[email protected]>
> ---
> include/net/bluetooth/l2cap.h | 3 ++
> net/bluetooth/l2cap_core.c | 41 ++++++++++++++++---------
> net/bluetooth/l2cap_sock.c | 67 +++++++++++++++++++++++++++++++++++++++--
> 3 files changed, 93 insertions(+), 18 deletions(-)
>
> diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
> index 9c18e55..82387c5 100644
> --- a/include/net/bluetooth/l2cap.h
> +++ b/include/net/bluetooth/l2cap.h
> @@ -422,6 +422,8 @@ struct l2cap_conn {
> struct l2cap_pinfo {
> struct bt_sock bt;
> struct l2cap_chan *chan;
> + struct sk_buff *rx_busy_skb;
> + int rx_busy;
> };
>
> enum {
> @@ -498,5 +500,6 @@ void l2cap_chan_close(struct l2cap_chan *chan, int reason);
> void l2cap_chan_destroy(struct l2cap_chan *chan);
> int l2cap_chan_connect(struct l2cap_chan *chan);
> int l2cap_chan_send(struct l2cap_chan *chan, struct msghdr *msg, size_t len);
> +void l2cap_chan_busy(struct l2cap_chan *chan, int busy);
>
> #endif /* __L2CAP_H */
> diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
> index 4b764b1..f0e7ba7 100644
> --- a/net/bluetooth/l2cap_core.c
> +++ b/net/bluetooth/l2cap_core.c
> @@ -3350,21 +3350,21 @@ static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 c
> }
>
> err = l2cap_ertm_reassembly_sdu(chan, skb, control);
> - if (err >= 0) {
> - chan->buffer_seq = (chan->buffer_seq + 1) % 64;
> - return err;
> - }
> -
> - l2cap_ertm_enter_local_busy(chan);
> -
> - bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
> - __skb_queue_tail(&chan->busy_q, skb);
> -
> - queue_work(_busy_wq, &chan->busy_work);
> + chan->buffer_seq = (chan->buffer_seq + 1) % 64;
>
> return err;
> }
>
> +void l2cap_chan_busy(struct l2cap_chan *chan, int busy)
> +{
> + if (chan->mode == L2CAP_MODE_ERTM) {
> + if (busy)
> + l2cap_ertm_enter_local_busy(chan);
> + else
> + l2cap_ertm_exit_local_busy(chan);
> + }
> +}
> +
> static int l2cap_streaming_reassembly_sdu(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
> {
> struct sk_buff *_skb;
> @@ -3463,13 +3463,22 @@ static void l2cap_check_srej_gap(struct l2cap_chan *chan, u8 tx_seq)
> struct sk_buff *skb;
> u16 control;
>
> - while ((skb = skb_peek(&chan->srej_q))) {
> + while ((skb = skb_peek(&chan->srej_q)) &&
> + !test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
> + int err;
> +
> if (bt_cb(skb)->tx_seq != tx_seq)
> break;
>
> skb = skb_dequeue(&chan->srej_q);
> control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
> - l2cap_ertm_reassembly_sdu(chan, skb, control);
> + err = l2cap_ertm_reassembly_sdu(chan, skb, control);
> +
> + if (err < 0) {
> + l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
> + break;
> + }
> +
> chan->buffer_seq_srej =
> (chan->buffer_seq_srej + 1) % 64;
> tx_seq = (tx_seq + 1) % 64;
> @@ -3625,8 +3634,10 @@ expected:
> }
>
> err = l2cap_push_rx_skb(chan, skb, rx_control);
> - if (err < 0)
> - return 0;
> + if (err < 0) {
> + l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
> + return err;
> + }
>
> if (rx_control & L2CAP_CTRL_FINAL) {
> if (!test_and_clear_bit(CONN_REJ_ACT, &chan->conn_state))
> diff --git a/net/bluetooth/l2cap_sock.c b/net/bluetooth/l2cap_sock.c
> index 39082d4..ab0494b 100644
> --- a/net/bluetooth/l2cap_sock.c
> +++ b/net/bluetooth/l2cap_sock.c
> @@ -711,6 +711,8 @@ static int l2cap_sock_sendmsg(struct kiocb *iocb, struct socket *sock, struct ms
> static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t len, int flags)
> {
> struct sock *sk = sock->sk;
> + struct l2cap_pinfo *pi = l2cap_pi(sk);
> + int err;
>
> lock_sock(sk);
>
> @@ -725,9 +727,40 @@ static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct ms
> release_sock(sk);
>
> if (sock->type == SOCK_STREAM)
> - return bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
> + err = bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
> + else
> + err = bt_sock_recvmsg(iocb, sock, msg, len, flags);
> +
> + if (pi->chan->mode == L2CAP_MODE_ERTM) {
> + int queue_err;
> + int threshold;
> +
> + /* Attempt to put pending rx data in the socket buffer */
> +
> + lock_sock(sk);
> + if (pi->rx_busy_skb) {
> + queue_err = sock_queue_rcv_skb(sk, pi->rx_busy_skb);
> +
> + if (!queue_err)
> + pi->rx_busy_skb = NULL;
> + }
> +
> + /* Restore data flow when half of the receive buffer is
> + * available. This avoids resending large numbers of
> + * frames.
> + */
> + threshold = sk->sk_rcvbuf >> 1;
> + if (pi->rx_busy && !pi->rx_busy_skb &&
> + atomic_read(&sk->sk_rmem_alloc) <= threshold) {
> +
> + pi->rx_busy = 0;
> + l2cap_chan_busy(l2cap_pi(sk)->chan, pi->rx_busy);
> + }
> +
> + release_sock(sk);
This is to much core logic outside of the core. I didn't think hard on this
but this can be simplified to just read the threshold and report core if we
are not busy anymore, and then get rid of rx_busy and rx_busy_skb.
> + }
>
> - return bt_sock_recvmsg(iocb, sock, msg, len, flags);
> + return err;
> }
>
> /* Kill socket (only if zapped and orphan)
> @@ -811,9 +844,32 @@ static struct l2cap_chan *l2cap_sock_new_connection_cb(void *data)
>
> static int l2cap_sock_recv_cb(void *data, struct sk_buff *skb)
> {
> + int err;
> struct sock *sk = data;
> + struct l2cap_pinfo *pi = l2cap_pi(sk);
> +
> + if (pi->rx_busy_skb)
> + return -ENOMEM;
> +
> + err = sock_queue_rcv_skb(sk, skb);
> +
> + /* For ERTM, handle one skb that doesn't fit into the recv
> + * buffer. This is important to do because the data frames
> + * have already been acked, so the skb cannot be discarded.
> + *
> + * Notify the l2cap core that the buffer is full, so the
> + * LOCAL_BUSY state is entered and no more frames are
> + * acked and reassembled until there is buffer space
> + * available.
> + */
> + if (err < 0 && pi->chan->mode == L2CAP_MODE_ERTM) {
> + pi->rx_busy = 1;
> + pi->rx_busy_skb = skb;
> + l2cap_chan_busy(pi->chan, pi->rx_busy);
> + err = 0;
> + }
This can totally be in l2cap_core.c. The idea is move Core stuff to
l2cap_core.c and not let the L2CAP users handle this. Just enter in LOCAL_BUSY
when recv_cb returns returns less than 0.
Gustavo
Hi Mat,
* Mat Martineau <[email protected]> [2011-06-29 14:35:20 -0700]:
> Signed-off-by: Mat Martineau <[email protected]>
> ---
> net/bluetooth/l2cap_core.c | 12 ++++++------
> 1 files changed, 6 insertions(+), 6 deletions(-)
Applied, thanks.
Gustavo
Hi Mat,
* Mat Martineau <[email protected]> [2011-06-29 14:35:21 -0700]:
> ERTM timeouts are defined in milliseconds, but need to be converted
> to jiffies when passed to mod_timer().
>
> Signed-off-by: Mat Martineau <[email protected]>
Good catch. I lacked some test on this I guess. Applied, thanks.
Gustavo
* Mat Martineau <[email protected]> [2011-06-29 14:35:19 -0700]:
> Even when the received tx_seq is expected, the frame still needs to be
> dropped if the TX window is exceeded or the receiver is in the local
> busy state.
The check doesn't mean that TX window is exceeded, it means that we have an
invalid tx_seq and connection should be closed. I don't see the point in
moving the expected check to after this one.
About the local busy check. Is it worth to drop expected frames when on local
busy? What are the advantages/disadvantages? Drop will trigger SREJ while
Store will press the buffer. Do you have measures to prove this?
Gustavo
Even when the received tx_seq is expected, the frame still needs to be
dropped if the TX window is exceeded or the receiver is in the local
busy state.
Signed-off-by: Mat Martineau <[email protected]>
---
net/bluetooth/l2cap_core.c | 6 +++---
1 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
index 9ec9c8c..74de2b7 100644
--- a/net/bluetooth/l2cap_core.c
+++ b/net/bluetooth/l2cap_core.c
@@ -3522,9 +3522,6 @@ static inline int l2cap_data_channel_iframe(struct l2cap_chan *chan, u16 rx_cont
chan->expected_ack_seq = req_seq;
l2cap_drop_acked_frames(chan);
- if (tx_seq == chan->expected_tx_seq)
- goto expected;
-
tx_seq_offset = (tx_seq - chan->buffer_seq) % 64;
if (tx_seq_offset < 0)
tx_seq_offset += 64;
@@ -3538,6 +3535,9 @@ static inline int l2cap_data_channel_iframe(struct l2cap_chan *chan, u16 rx_cont
if (test_bit(CONN_LOCAL_BUSY, &chan->conn_state))
goto drop;
+ if (tx_seq == chan->expected_tx_seq)
+ goto expected;
+
if (test_bit(CONN_SREJ_SENT, &chan->conn_state)) {
struct srej_list *first;
--
1.7.5.4
--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum
Signed-off-by: Mat Martineau <[email protected]>
---
net/bluetooth/l2cap_core.c | 12 ++++++------
1 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
index 74de2b7..24c94a3 100644
--- a/net/bluetooth/l2cap_core.c
+++ b/net/bluetooth/l2cap_core.c
@@ -223,18 +223,18 @@ static u16 l2cap_alloc_cid(struct l2cap_conn *conn)
static void l2cap_set_timer(struct l2cap_chan *chan, struct timer_list *timer, long timeout)
{
- BT_DBG("chan %p state %d timeout %ld", chan->sk, chan->state, timeout);
+ BT_DBG("chan %p state %d timeout %ld", chan->sk, chan->state, timeout);
- if (!mod_timer(timer, jiffies + timeout))
- chan_hold(chan);
+ if (!mod_timer(timer, jiffies + timeout))
+ chan_hold(chan);
}
static void l2cap_clear_timer(struct l2cap_chan *chan, struct timer_list *timer)
{
- BT_DBG("chan %p state %d", chan, chan->state);
+ BT_DBG("chan %p state %d", chan, chan->state);
- if (timer_pending(timer) && del_timer(timer))
- chan_put(chan);
+ if (timer_pending(timer) && del_timer(timer))
+ chan_put(chan);
}
static void l2cap_state_change(struct l2cap_chan *chan, int state)
--
1.7.5.4
--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum
ERTM timeouts are defined in milliseconds, but need to be converted
to jiffies when passed to mod_timer().
Signed-off-by: Mat Martineau <[email protected]>
---
net/bluetooth/l2cap_core.c | 2 +-
1 files changed, 1 insertions(+), 1 deletions(-)
diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
index 24c94a3..da6f532 100644
--- a/net/bluetooth/l2cap_core.c
+++ b/net/bluetooth/l2cap_core.c
@@ -225,7 +225,7 @@ static void l2cap_set_timer(struct l2cap_chan *chan, struct timer_list *timer, l
{
BT_DBG("chan %p state %d timeout %ld", chan->sk, chan->state, timeout);
- if (!mod_timer(timer, jiffies + timeout))
+ if (!mod_timer(timer, jiffies + msecs_to_jiffies(timeout)))
chan_hold(chan);
}
--
1.7.5.4
--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum
The local busy state is entered and exited based on buffer status in
the socket layer (or other upper layer). This change is in
preparation for general buffer status reports from the socket layer,
which will then be used to change the local busy status.
Signed-off-by: Mat Martineau <[email protected]>
---
net/bluetooth/l2cap_core.c | 62 +++++++++++++++++++++++++++-----------------
1 files changed, 38 insertions(+), 24 deletions(-)
diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
index da6f532..4b764b1 100644
--- a/net/bluetooth/l2cap_core.c
+++ b/net/bluetooth/l2cap_core.c
@@ -3227,22 +3227,26 @@ disconnect:
return 0;
}
-static int l2cap_try_push_rx_skb(struct l2cap_chan *chan)
+static void l2cap_ertm_enter_local_busy(struct l2cap_chan *chan)
{
- struct sk_buff *skb;
u16 control;
- int err;
- while ((skb = skb_dequeue(&chan->busy_q))) {
- control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
- err = l2cap_ertm_reassembly_sdu(chan, skb, control);
- if (err < 0) {
- skb_queue_head(&chan->busy_q, skb);
- return -EBUSY;
- }
+ BT_DBG("chan %p, Enter local busy", chan);
- chan->buffer_seq = (chan->buffer_seq + 1) % 64;
- }
+ set_bit(CONN_LOCAL_BUSY, &chan->conn_state);
+
+ control = chan->buffer_seq << L2CAP_CTRL_REQSEQ_SHIFT;
+ control |= L2CAP_SUPER_RCV_NOT_READY;
+ l2cap_send_sframe(chan, control);
+
+ set_bit(CONN_RNR_SENT, &chan->conn_state);
+
+ __clear_ack_timer(chan);
+}
+
+static void l2cap_ertm_exit_local_busy(struct l2cap_chan *chan)
+{
+ u16 control;
if (!test_bit(CONN_RNR_SENT, &chan->conn_state))
goto done;
@@ -3262,6 +3266,26 @@ done:
clear_bit(CONN_RNR_SENT, &chan->conn_state);
BT_DBG("chan %p, Exit local busy", chan);
+}
+
+static int l2cap_try_push_rx_skb(struct l2cap_chan *chan)
+{
+ struct sk_buff *skb;
+ u16 control;
+ int err;
+
+ while ((skb = skb_dequeue(&chan->busy_q))) {
+ control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
+ err = l2cap_ertm_reassembly_sdu(chan, skb, control);
+ if (err < 0) {
+ skb_queue_head(&chan->busy_q, skb);
+ return -EBUSY;
+ }
+
+ chan->buffer_seq = (chan->buffer_seq + 1) % 64;
+ }
+
+ l2cap_ertm_exit_local_busy(chan);
return 0;
}
@@ -3315,7 +3339,7 @@ static void l2cap_busy_work(struct work_struct *work)
static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
{
- int sctrl, err;
+ int err;
if (test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
@@ -3331,21 +3355,11 @@ static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 c
return err;
}
- /* Busy Condition */
- BT_DBG("chan %p, Enter local busy", chan);
+ l2cap_ertm_enter_local_busy(chan);
- set_bit(CONN_LOCAL_BUSY, &chan->conn_state);
bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
__skb_queue_tail(&chan->busy_q, skb);
- sctrl = chan->buffer_seq << L2CAP_CTRL_REQSEQ_SHIFT;
- sctrl |= L2CAP_SUPER_RCV_NOT_READY;
- l2cap_send_sframe(chan, sctrl);
-
- set_bit(CONN_RNR_SENT, &chan->conn_state);
-
- __clear_ack_timer(chan);
-
queue_work(_busy_wq, &chan->busy_work);
return err;
--
1.7.5.4
--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum
This change moves most L2CAP ERTM receive buffer handling out of the
L2CAP core and in to the socket code. It's up to the higher layer
(the socket code, in this case) to tell the core when its buffer is
full or has space available. The recv op should always accept
incoming ERTM data or else the connection will go down.
Within the socket layer, an skb that does not fit in the socket
receive buffer will be temporarily stored. When the socket is read
from, that skb will be placed in the receive buffer if possible. Once
adequate buffer space becomes available, the L2CAP core is informed
and the ERTM local busy state is cleared.
Receive buffer management for non-ERTM modes is unchanged.
Signed-off-by: Mat Martineau <[email protected]>
---
include/net/bluetooth/l2cap.h | 3 ++
net/bluetooth/l2cap_core.c | 41 ++++++++++++++++---------
net/bluetooth/l2cap_sock.c | 67 +++++++++++++++++++++++++++++++++++++++--
3 files changed, 93 insertions(+), 18 deletions(-)
diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
index 9c18e55..82387c5 100644
--- a/include/net/bluetooth/l2cap.h
+++ b/include/net/bluetooth/l2cap.h
@@ -422,6 +422,8 @@ struct l2cap_conn {
struct l2cap_pinfo {
struct bt_sock bt;
struct l2cap_chan *chan;
+ struct sk_buff *rx_busy_skb;
+ int rx_busy;
};
enum {
@@ -498,5 +500,6 @@ void l2cap_chan_close(struct l2cap_chan *chan, int reason);
void l2cap_chan_destroy(struct l2cap_chan *chan);
int l2cap_chan_connect(struct l2cap_chan *chan);
int l2cap_chan_send(struct l2cap_chan *chan, struct msghdr *msg, size_t len);
+void l2cap_chan_busy(struct l2cap_chan *chan, int busy);
#endif /* __L2CAP_H */
diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
index 4b764b1..f0e7ba7 100644
--- a/net/bluetooth/l2cap_core.c
+++ b/net/bluetooth/l2cap_core.c
@@ -3350,21 +3350,21 @@ static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 c
}
err = l2cap_ertm_reassembly_sdu(chan, skb, control);
- if (err >= 0) {
- chan->buffer_seq = (chan->buffer_seq + 1) % 64;
- return err;
- }
-
- l2cap_ertm_enter_local_busy(chan);
-
- bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
- __skb_queue_tail(&chan->busy_q, skb);
-
- queue_work(_busy_wq, &chan->busy_work);
+ chan->buffer_seq = (chan->buffer_seq + 1) % 64;
return err;
}
+void l2cap_chan_busy(struct l2cap_chan *chan, int busy)
+{
+ if (chan->mode == L2CAP_MODE_ERTM) {
+ if (busy)
+ l2cap_ertm_enter_local_busy(chan);
+ else
+ l2cap_ertm_exit_local_busy(chan);
+ }
+}
+
static int l2cap_streaming_reassembly_sdu(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
{
struct sk_buff *_skb;
@@ -3463,13 +3463,22 @@ static void l2cap_check_srej_gap(struct l2cap_chan *chan, u8 tx_seq)
struct sk_buff *skb;
u16 control;
- while ((skb = skb_peek(&chan->srej_q))) {
+ while ((skb = skb_peek(&chan->srej_q)) &&
+ !test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
+ int err;
+
if (bt_cb(skb)->tx_seq != tx_seq)
break;
skb = skb_dequeue(&chan->srej_q);
control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
- l2cap_ertm_reassembly_sdu(chan, skb, control);
+ err = l2cap_ertm_reassembly_sdu(chan, skb, control);
+
+ if (err < 0) {
+ l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
+ break;
+ }
+
chan->buffer_seq_srej =
(chan->buffer_seq_srej + 1) % 64;
tx_seq = (tx_seq + 1) % 64;
@@ -3625,8 +3634,10 @@ expected:
}
err = l2cap_push_rx_skb(chan, skb, rx_control);
- if (err < 0)
- return 0;
+ if (err < 0) {
+ l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
+ return err;
+ }
if (rx_control & L2CAP_CTRL_FINAL) {
if (!test_and_clear_bit(CONN_REJ_ACT, &chan->conn_state))
diff --git a/net/bluetooth/l2cap_sock.c b/net/bluetooth/l2cap_sock.c
index 39082d4..ab0494b 100644
--- a/net/bluetooth/l2cap_sock.c
+++ b/net/bluetooth/l2cap_sock.c
@@ -711,6 +711,8 @@ static int l2cap_sock_sendmsg(struct kiocb *iocb, struct socket *sock, struct ms
static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t len, int flags)
{
struct sock *sk = sock->sk;
+ struct l2cap_pinfo *pi = l2cap_pi(sk);
+ int err;
lock_sock(sk);
@@ -725,9 +727,40 @@ static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct ms
release_sock(sk);
if (sock->type == SOCK_STREAM)
- return bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
+ err = bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
+ else
+ err = bt_sock_recvmsg(iocb, sock, msg, len, flags);
+
+ if (pi->chan->mode == L2CAP_MODE_ERTM) {
+ int queue_err;
+ int threshold;
+
+ /* Attempt to put pending rx data in the socket buffer */
+
+ lock_sock(sk);
+ if (pi->rx_busy_skb) {
+ queue_err = sock_queue_rcv_skb(sk, pi->rx_busy_skb);
+
+ if (!queue_err)
+ pi->rx_busy_skb = NULL;
+ }
+
+ /* Restore data flow when half of the receive buffer is
+ * available. This avoids resending large numbers of
+ * frames.
+ */
+ threshold = sk->sk_rcvbuf >> 1;
+ if (pi->rx_busy && !pi->rx_busy_skb &&
+ atomic_read(&sk->sk_rmem_alloc) <= threshold) {
+
+ pi->rx_busy = 0;
+ l2cap_chan_busy(l2cap_pi(sk)->chan, pi->rx_busy);
+ }
+
+ release_sock(sk);
+ }
- return bt_sock_recvmsg(iocb, sock, msg, len, flags);
+ return err;
}
/* Kill socket (only if zapped and orphan)
@@ -811,9 +844,32 @@ static struct l2cap_chan *l2cap_sock_new_connection_cb(void *data)
static int l2cap_sock_recv_cb(void *data, struct sk_buff *skb)
{
+ int err;
struct sock *sk = data;
+ struct l2cap_pinfo *pi = l2cap_pi(sk);
+
+ if (pi->rx_busy_skb)
+ return -ENOMEM;
+
+ err = sock_queue_rcv_skb(sk, skb);
+
+ /* For ERTM, handle one skb that doesn't fit into the recv
+ * buffer. This is important to do because the data frames
+ * have already been acked, so the skb cannot be discarded.
+ *
+ * Notify the l2cap core that the buffer is full, so the
+ * LOCAL_BUSY state is entered and no more frames are
+ * acked and reassembled until there is buffer space
+ * available.
+ */
+ if (err < 0 && pi->chan->mode == L2CAP_MODE_ERTM) {
+ pi->rx_busy = 1;
+ pi->rx_busy_skb = skb;
+ l2cap_chan_busy(pi->chan, pi->rx_busy);
+ err = 0;
+ }
- return sock_queue_rcv_skb(sk, skb);
+ return err;
}
static void l2cap_sock_close_cb(void *data)
@@ -842,6 +898,11 @@ static void l2cap_sock_destruct(struct sock *sk)
{
BT_DBG("sk %p", sk);
+ if (l2cap_pi(sk)->rx_busy_skb) {
+ kfree_skb(l2cap_pi(sk)->rx_busy_skb);
+ l2cap_pi(sk)->rx_busy_skb = NULL;
+ }
+
skb_queue_purge(&sk->sk_receive_queue);
skb_queue_purge(&sk->sk_write_queue);
}
--
1.7.5.4
--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum
The ERTM receive buffer is now handled in a way that does not require
the busy queue and the associated polling code.
Signed-off-by: Mat Martineau <[email protected]>
---
include/net/bluetooth/l2cap.h | 4 -
net/bluetooth/l2cap_core.c | 125 +++--------------------------------------
2 files changed, 8 insertions(+), 121 deletions(-)
diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
index 82387c5..42fc48e 100644
--- a/include/net/bluetooth/l2cap.h
+++ b/include/net/bluetooth/l2cap.h
@@ -37,7 +37,6 @@
#define L2CAP_DEFAULT_MONITOR_TO 12000 /* 12 seconds */
#define L2CAP_DEFAULT_MAX_PDU_SIZE 1009 /* Sized for 3-DH5 packet */
#define L2CAP_DEFAULT_ACK_TO 200
-#define L2CAP_LOCAL_BUSY_TRIES 12
#define L2CAP_LE_DEFAULT_MTU 23
#define L2CAP_CONN_TIMEOUT (40000) /* 40 seconds */
@@ -352,8 +351,6 @@ struct l2cap_chan {
struct sk_buff *tx_send_head;
struct sk_buff_head tx_q;
struct sk_buff_head srej_q;
- struct sk_buff_head busy_q;
- struct work_struct busy_work;
struct list_head srej_l;
struct list_head list;
@@ -451,7 +448,6 @@ enum {
CONN_REJ_ACT,
CONN_SEND_FBIT,
CONN_RNR_SENT,
- CONN_SAR_RETRY,
};
#define __set_chan_timer(c, t) l2cap_set_timer(c, &c->chan_timer, (t))
diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
index f0e7ba7..8c9c8ba 100644
--- a/net/bluetooth/l2cap_core.c
+++ b/net/bluetooth/l2cap_core.c
@@ -61,13 +61,9 @@ int disable_ertm;
static u32 l2cap_feat_mask = L2CAP_FEAT_FIXED_CHAN;
static u8 l2cap_fixed_chan[8] = { 0x02, };
-static struct workqueue_struct *_busy_wq;
-
static LIST_HEAD(chan_list);
static DEFINE_RWLOCK(chan_list_lock);
-static void l2cap_busy_work(struct work_struct *work);
-
static struct sk_buff *l2cap_build_cmd(struct l2cap_conn *conn,
u8 code, u8 ident, u16 dlen, void *data);
static void l2cap_send_cmd(struct l2cap_conn *conn, u8 ident, u8 code, u16 len,
@@ -395,7 +391,6 @@ static void l2cap_chan_del(struct l2cap_chan *chan, int err)
__clear_ack_timer(chan);
skb_queue_purge(&chan->srej_q);
- skb_queue_purge(&chan->busy_q);
list_for_each_entry_safe(l, tmp, &chan->srej_l, list) {
list_del(&l->list);
@@ -1873,11 +1868,9 @@ static inline void l2cap_ertm_init(struct l2cap_chan *chan)
setup_timer(&chan->ack_timer, l2cap_ack_timeout, (unsigned long) chan);
skb_queue_head_init(&chan->srej_q);
- skb_queue_head_init(&chan->busy_q);
INIT_LIST_HEAD(&chan->srej_l);
- INIT_WORK(&chan->busy_work, l2cap_busy_work);
sk->sk_backlog_rcv = l2cap_ertm_data_rcv;
}
@@ -3182,32 +3175,27 @@ static int l2cap_ertm_reassembly_sdu(struct l2cap_chan *chan, struct sk_buff *sk
if (!chan->sdu)
goto disconnect;
- if (!test_bit(CONN_SAR_RETRY, &chan->conn_state)) {
- chan->partial_sdu_len += skb->len;
+ chan->partial_sdu_len += skb->len;
- if (chan->partial_sdu_len > chan->imtu)
- goto drop;
+ if (chan->partial_sdu_len > chan->imtu)
+ goto drop;
- if (chan->partial_sdu_len != chan->sdu_len)
- goto drop;
+ if (chan->partial_sdu_len != chan->sdu_len)
+ goto drop;
- memcpy(skb_put(chan->sdu, skb->len), skb->data, skb->len);
- }
+ memcpy(skb_put(chan->sdu, skb->len), skb->data, skb->len);
_skb = skb_clone(chan->sdu, GFP_ATOMIC);
if (!_skb) {
- set_bit(CONN_SAR_RETRY, &chan->conn_state);
return -ENOMEM;
}
err = chan->ops->recv(chan->data, _skb);
if (err < 0) {
kfree_skb(_skb);
- set_bit(CONN_SAR_RETRY, &chan->conn_state);
return err;
}
- clear_bit(CONN_SAR_RETRY, &chan->conn_state);
clear_bit(CONN_SAR_SDU, &chan->conn_state);
kfree_skb(chan->sdu);
@@ -3268,93 +3256,6 @@ done:
BT_DBG("chan %p, Exit local busy", chan);
}
-static int l2cap_try_push_rx_skb(struct l2cap_chan *chan)
-{
- struct sk_buff *skb;
- u16 control;
- int err;
-
- while ((skb = skb_dequeue(&chan->busy_q))) {
- control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
- err = l2cap_ertm_reassembly_sdu(chan, skb, control);
- if (err < 0) {
- skb_queue_head(&chan->busy_q, skb);
- return -EBUSY;
- }
-
- chan->buffer_seq = (chan->buffer_seq + 1) % 64;
- }
-
- l2cap_ertm_exit_local_busy(chan);
-
- return 0;
-}
-
-static void l2cap_busy_work(struct work_struct *work)
-{
- DECLARE_WAITQUEUE(wait, current);
- struct l2cap_chan *chan =
- container_of(work, struct l2cap_chan, busy_work);
- struct sock *sk = chan->sk;
- int n_tries = 0, timeo = HZ/5, err;
- struct sk_buff *skb;
-
- lock_sock(sk);
-
- add_wait_queue(sk_sleep(sk), &wait);
- while ((skb = skb_peek(&chan->busy_q))) {
- set_current_state(TASK_INTERRUPTIBLE);
-
- if (n_tries++ > L2CAP_LOCAL_BUSY_TRIES) {
- err = -EBUSY;
- l2cap_send_disconn_req(chan->conn, chan, EBUSY);
- break;
- }
-
- if (!timeo)
- timeo = HZ/5;
-
- if (signal_pending(current)) {
- err = sock_intr_errno(timeo);
- break;
- }
-
- release_sock(sk);
- timeo = schedule_timeout(timeo);
- lock_sock(sk);
-
- err = sock_error(sk);
- if (err)
- break;
-
- if (l2cap_try_push_rx_skb(chan) == 0)
- break;
- }
-
- set_current_state(TASK_RUNNING);
- remove_wait_queue(sk_sleep(sk), &wait);
-
- release_sock(sk);
-}
-
-static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
-{
- int err;
-
- if (test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
- bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
- __skb_queue_tail(&chan->busy_q, skb);
- return l2cap_try_push_rx_skb(chan);
-
-
- }
-
- err = l2cap_ertm_reassembly_sdu(chan, skb, control);
- chan->buffer_seq = (chan->buffer_seq + 1) % 64;
-
- return err;
-}
-
void l2cap_chan_busy(struct l2cap_chan *chan, int busy)
{
if (chan->mode == L2CAP_MODE_ERTM) {
@@ -3612,7 +3513,6 @@ static inline int l2cap_data_channel_iframe(struct l2cap_chan *chan, u16 rx_cont
chan->buffer_seq_srej = chan->buffer_seq;
__skb_queue_head_init(&chan->srej_q);
- __skb_queue_head_init(&chan->busy_q);
l2cap_add_to_srej_queue(chan, skb, tx_seq, sar);
set_bit(CONN_SEND_PBIT, &chan->conn_state);
@@ -3633,7 +3533,8 @@ expected:
return 0;
}
- err = l2cap_push_rx_skb(chan, skb, rx_control);
+ err = l2cap_ertm_reassembly_sdu(chan, skb, rx_control);
+ chan->buffer_seq = (chan->buffer_seq + 1) % 64;
if (err < 0) {
l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
return err;
@@ -4439,12 +4340,6 @@ int __init l2cap_init(void)
if (err < 0)
return err;
- _busy_wq = create_singlethread_workqueue("l2cap");
- if (!_busy_wq) {
- err = -ENOMEM;
- goto error;
- }
-
err = hci_register_proto(&l2cap_hci_proto);
if (err < 0) {
BT_ERR("L2CAP protocol registration failed");
@@ -4462,7 +4357,6 @@ int __init l2cap_init(void)
return 0;
error:
- destroy_workqueue(_busy_wq);
l2cap_cleanup_sockets();
return err;
}
@@ -4471,9 +4365,6 @@ void l2cap_exit(void)
{
debugfs_remove(l2cap_debugfs);
- flush_workqueue(_busy_wq);
- destroy_workqueue(_busy_wq);
-
if (hci_unregister_proto(&l2cap_hci_proto) < 0)
BT_ERR("L2CAP protocol unregistration failed");
--
1.7.5.4
--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum
Hi Mat,
* Mat Martineau <[email protected]> [2011-07-01 10:29:23 -0700]:
>
> Hi Gustavo -
>
> On Thu, 30 Jun 2011, Gustavo F. Padovan wrote:
>
> >Hi Mat,
> >
> >* Mat Martineau <[email protected]> [2011-06-29 14:35:23 -0700]:
> >
> >>This change moves most L2CAP ERTM receive buffer handling out of the
> >>L2CAP core and in to the socket code. It's up to the higher layer
> >>(the socket code, in this case) to tell the core when its buffer is
> >>full or has space available. The recv op should always accept
> >>incoming ERTM data or else the connection will go down.
> >>
> >>Within the socket layer, an skb that does not fit in the socket
> >>receive buffer will be temporarily stored. When the socket is read
> >>from, that skb will be placed in the receive buffer if possible. Once
> >>adequate buffer space becomes available, the L2CAP core is informed
> >>and the ERTM local busy state is cleared.
> >>
> >>Receive buffer management for non-ERTM modes is unchanged.
> >>
> >>Signed-off-by: Mat Martineau <[email protected]>
> >>---
> >> include/net/bluetooth/l2cap.h | 3 ++
> >> net/bluetooth/l2cap_core.c | 41 ++++++++++++++++---------
> >> net/bluetooth/l2cap_sock.c | 67 +++++++++++++++++++++++++++++++++++++++--
> >> 3 files changed, 93 insertions(+), 18 deletions(-)
> >>
> >>diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
> >>index 9c18e55..82387c5 100644
> >>--- a/include/net/bluetooth/l2cap.h
> >>+++ b/include/net/bluetooth/l2cap.h
> >>@@ -422,6 +422,8 @@ struct l2cap_conn {
> >> struct l2cap_pinfo {
> >> struct bt_sock bt;
> >> struct l2cap_chan *chan;
> >>+ struct sk_buff *rx_busy_skb;
> >>+ int rx_busy;
> >> };
> >>
> >> enum {
> >>@@ -498,5 +500,6 @@ void l2cap_chan_close(struct l2cap_chan *chan, int reason);
> >> void l2cap_chan_destroy(struct l2cap_chan *chan);
> >> int l2cap_chan_connect(struct l2cap_chan *chan);
> >> int l2cap_chan_send(struct l2cap_chan *chan, struct msghdr *msg, size_t len);
> >>+void l2cap_chan_busy(struct l2cap_chan *chan, int busy);
> >>
> >> #endif /* __L2CAP_H */
> >>diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
> >>index 4b764b1..f0e7ba7 100644
> >>--- a/net/bluetooth/l2cap_core.c
> >>+++ b/net/bluetooth/l2cap_core.c
> >>@@ -3350,21 +3350,21 @@ static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 c
> >> }
> >>
> >> err = l2cap_ertm_reassembly_sdu(chan, skb, control);
> >>- if (err >= 0) {
> >>- chan->buffer_seq = (chan->buffer_seq + 1) % 64;
> >>- return err;
> >>- }
> >>-
> >>- l2cap_ertm_enter_local_busy(chan);
> >>-
> >>- bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
> >>- __skb_queue_tail(&chan->busy_q, skb);
> >>-
> >>- queue_work(_busy_wq, &chan->busy_work);
> >>+ chan->buffer_seq = (chan->buffer_seq + 1) % 64;
> >>
> >> return err;
> >> }
> >>
> >>+void l2cap_chan_busy(struct l2cap_chan *chan, int busy)
> >>+{
> >>+ if (chan->mode == L2CAP_MODE_ERTM) {
> >>+ if (busy)
> >>+ l2cap_ertm_enter_local_busy(chan);
> >>+ else
> >>+ l2cap_ertm_exit_local_busy(chan);
> >>+ }
> >>+}
> >>+
> >> static int l2cap_streaming_reassembly_sdu(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
> >> {
> >> struct sk_buff *_skb;
> >>@@ -3463,13 +3463,22 @@ static void l2cap_check_srej_gap(struct l2cap_chan *chan, u8 tx_seq)
> >> struct sk_buff *skb;
> >> u16 control;
> >>
> >>- while ((skb = skb_peek(&chan->srej_q))) {
> >>+ while ((skb = skb_peek(&chan->srej_q)) &&
> >>+ !test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
> >>+ int err;
> >>+
> >> if (bt_cb(skb)->tx_seq != tx_seq)
> >> break;
> >>
> >> skb = skb_dequeue(&chan->srej_q);
> >> control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
> >>- l2cap_ertm_reassembly_sdu(chan, skb, control);
> >>+ err = l2cap_ertm_reassembly_sdu(chan, skb, control);
> >>+
> >>+ if (err < 0) {
> >>+ l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
> >>+ break;
> >>+ }
> >>+
> >> chan->buffer_seq_srej =
> >> (chan->buffer_seq_srej + 1) % 64;
> >> tx_seq = (tx_seq + 1) % 64;
> >>@@ -3625,8 +3634,10 @@ expected:
> >> }
> >>
> >> err = l2cap_push_rx_skb(chan, skb, rx_control);
> >>- if (err < 0)
> >>- return 0;
> >>+ if (err < 0) {
> >>+ l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
> >>+ return err;
> >>+ }
> >>
> >> if (rx_control & L2CAP_CTRL_FINAL) {
> >> if (!test_and_clear_bit(CONN_REJ_ACT, &chan->conn_state))
> >>diff --git a/net/bluetooth/l2cap_sock.c b/net/bluetooth/l2cap_sock.c
> >>index 39082d4..ab0494b 100644
> >>--- a/net/bluetooth/l2cap_sock.c
> >>+++ b/net/bluetooth/l2cap_sock.c
> >>@@ -711,6 +711,8 @@ static int l2cap_sock_sendmsg(struct kiocb *iocb, struct socket *sock, struct ms
> >> static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t len, int flags)
> >> {
> >> struct sock *sk = sock->sk;
> >>+ struct l2cap_pinfo *pi = l2cap_pi(sk);
> >>+ int err;
> >>
> >> lock_sock(sk);
> >>
> >>@@ -725,9 +727,40 @@ static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct ms
> >> release_sock(sk);
> >>
> >> if (sock->type == SOCK_STREAM)
> >>- return bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
> >>+ err = bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
> >>+ else
> >>+ err = bt_sock_recvmsg(iocb, sock, msg, len, flags);
> >>+
> >>+ if (pi->chan->mode == L2CAP_MODE_ERTM) {
> >>+ int queue_err;
> >>+ int threshold;
> >>+
> >>+ /* Attempt to put pending rx data in the socket buffer */
> >>+
> >>+ lock_sock(sk);
> >>+ if (pi->rx_busy_skb) {
> >>+ queue_err = sock_queue_rcv_skb(sk, pi->rx_busy_skb);
> >>+
> >>+ if (!queue_err)
> >>+ pi->rx_busy_skb = NULL;
> >>+ }
> >>+
> >>+ /* Restore data flow when half of the receive buffer is
> >>+ * available. This avoids resending large numbers of
> >>+ * frames.
> >>+ */
> >>+ threshold = sk->sk_rcvbuf >> 1;
> >>+ if (pi->rx_busy && !pi->rx_busy_skb &&
> >>+ atomic_read(&sk->sk_rmem_alloc) <= threshold) {
> >>+
> >>+ pi->rx_busy = 0;
> >>+ l2cap_chan_busy(l2cap_pi(sk)->chan, pi->rx_busy);
> >>+ }
> >>+
> >>+ release_sock(sk);
> >
> >This is to much core logic outside of the core. I didn't think hard on this
> >but this can be simplified to just read the threshold and report core if we
> >are not busy anymore, and then get rid of rx_busy and rx_busy_skb.
>
>
> The main purpose of this patch is to change the behavior of
> l2cap_ops->recv() to better match what ERTM needs.
>
> The way sock_queue_rcv_skb() works is not a good match for ERTM.
> ERTM can't put data in to the buffer until after it is reassembled,
> so when sock_queue_rcv_skb() tells the core there's no room, the
> core needs to keep track of the skb that didn't fit.
>
> The core code is simpler if we have a recv_cb function that tells
> the core it is busy *before* it will start rejecting skbs. Then the
> core does not have to keep track of reassembled and acked skbs. The
> core knows that it gave the skb to the socket (or AMP manager) and
> does not have to worry about it any more.
>
> This patch hides socket-specific behavior in socket-specific code,
> which seems like a good fit. The core should not have to care about
> socket-specific behavior.
>
> It also delays exit of the local busy state until there is receive
> buffer space available. This results in fewer transitions in & out
> of local busy.
>
>
> I could get rid of rx_busy by querying the LOCAL_BUSY bit from the
> core.
>
>
> >
> >>+ }
> >>
> >>- return bt_sock_recvmsg(iocb, sock, msg, len, flags);
> >>+ return err;
> >> }
> >>
> >> /* Kill socket (only if zapped and orphan)
> >>@@ -811,9 +844,32 @@ static struct l2cap_chan *l2cap_sock_new_connection_cb(void *data)
> >>
> >> static int l2cap_sock_recv_cb(void *data, struct sk_buff *skb)
> >> {
> >>+ int err;
> >> struct sock *sk = data;
> >>+ struct l2cap_pinfo *pi = l2cap_pi(sk);
> >>+
> >>+ if (pi->rx_busy_skb)
> >>+ return -ENOMEM;
> >>+
> >>+ err = sock_queue_rcv_skb(sk, skb);
> >>+
> >>+ /* For ERTM, handle one skb that doesn't fit into the recv
> >>+ * buffer. This is important to do because the data frames
> >>+ * have already been acked, so the skb cannot be discarded.
> >>+ *
> >>+ * Notify the l2cap core that the buffer is full, so the
> >>+ * LOCAL_BUSY state is entered and no more frames are
> >>+ * acked and reassembled until there is buffer space
> >>+ * available.
> >>+ */
> >>+ if (err < 0 && pi->chan->mode == L2CAP_MODE_ERTM) {
> >>+ pi->rx_busy = 1;
> >>+ pi->rx_busy_skb = skb;
> >>+ l2cap_chan_busy(pi->chan, pi->rx_busy);
> >>+ err = 0;
> >>+ }
> >
> >This can totally be in l2cap_core.c. The idea is move Core stuff to
> >l2cap_core.c and not let the L2CAP users handle this. Just enter in LOCAL_BUSY
> >when recv_cb returns returns less than 0.
>
>
> I see the core's responsibility as handling the ERTM protocol - it
> receives, reassembles, and acks the data. When it has finished that
> job, and has no indication that it should be "busy", it passes that
> data to the higher layer (socket or AMP manager) and is done with
> it. This means the socket layer must keep the one skb that didn't
> fit in the receive buffer, and it puts that data in the socket
> rcvbuf in a way that is invisible to the core.
>
> This changes the local busy behavior from:
>
> * skb arrives, core reassembles it
> * Core calls socket recv_cb, error returned
> * Core puts 'end' PDU or 'unsegmented' SDU on the busy_q
> * Core sends RNR
> * Core polls socket recv_cb [up to 12 calls from core to socket]
> * Reassembly code is executed each time (but data not re-copied)
> * Core sends RR though there may be very little space in the socket
> recv buffer
>
> and instead does:
>
> * skb arrives, core reassembles it exactly once
> * Core calls socket recv_cb
> * Socket tells the core it is busy
> * Core sends RNR
> * Application calls recvmsg() until buffer is half full (no core
> interaction) [no limit other than socket timeout]
> * Socket tells the core it is not busy
> * Core sends RR, and there is socket buffer space to accept
> incoming data
>
> The new way has no iteration in the core, and all of the work that
> happens during a recvmsg() stays in the socket code until there is
> buffer space available.
>
>
> The change is also motiviated by trying to remove the busy
> workqueue, which is a good thing to do for several reasons:
>
> * Gets rid of an extra thread and associated scheduling and memory
> * The current implementation of l2cap_busy_work() does not operate
> well with multiple sockets (the workqueue thread is blocked until
> l2cap_busy_work() returns - so no)
> * The socket layer knows when the receive buffer was read from, so
> it makes sense to only attempt reassembly after recvmsg and not poll
> every 200ms.
> * Data gets to the socket layer faster if there's no wait for the
> next polling attempt.
> * Channel is disconnected after a couple of seconds, which isn't
> necessary and may break some apps
This is actually a good idea in the end, get rid of the workqueue thread and
the polling every 200ms is a good thing to do. I didn't grok the recv path
totally when I first wrote this code. ;)
So let's remove busy_rx and go got a -v2 patch. Remove busy_rx may reduce the
logic inside socket code.
Gustavo
On Fri, 1 Jul 2011, Gustavo F. Padovan wrote:
> * Mat Martineau <[email protected]> [2011-06-30 16:36:01 -0700]:
>
>>
>> Hi Gustavo -
>>
>> On Thu, 30 Jun 2011, Gustavo F. Padovan wrote:
>>
>>> * Mat Martineau <[email protected]> [2011-06-29 14:35:19 -0700]:
>>>
>>>> Even when the received tx_seq is expected, the frame still needs to be
>>>> dropped if the TX window is exceeded or the receiver is in the local
>>>> busy state.
>>>
>>> The check doesn't mean that TX window is exceeded, it means that we have an
>>> invalid tx_seq and connection should be closed. I don't see the point in
>>> moving the expected check to after this one.
>>> About the local busy check. Is it worth to drop expected frames when on local
>>> busy? What are the advantages/disadvantages? Drop will trigger SREJ while
>>> Store will press the buffer. Do you have measures to prove this?
>>
>>
>> It's possible for expected_tx_seq to be invalid if the tx window is
>> full. Therefore it is important to check for an invalid tx_seq
>> before checking if it is expected.
>>
>>
>> Dropping frames during local busy is a good idea because:
>>
>> * Queuing frames during local busy ignores the receive buffer
>> limits requested by the application with SO_RCVBUF. This problem
>> gets worse with extended window sizes, where the busy_q could be
>> quite large
>> * ERTM senders are not supposed to send frames during local busy
>> anyway
>> * The spec allows for packets to be dropped in local busy (look for
>> "StoreOrIgnore")
>>
>> It's the memory that's most important, though. Dropping all incoming
>> iframes during local busy is a simple way to keep data buffer size
>> bounded while still allowing for larger tx windows.
>>
>> If the application can't keep up with the data rate and is
>> triggering local busy, throughput isn't going to be improved
>> significatnly by queuing those frames during local busy.
>
> You convinced me, applied! Thanks.
Thanks for applying this! Have you had a chance to consider the other
local busy changes for recv_cb?
--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum
* Mat Martineau <[email protected]> [2011-06-30 16:36:01 -0700]:
>
> Hi Gustavo -
>
> On Thu, 30 Jun 2011, Gustavo F. Padovan wrote:
>
> >* Mat Martineau <[email protected]> [2011-06-29 14:35:19 -0700]:
> >
> >>Even when the received tx_seq is expected, the frame still needs to be
> >>dropped if the TX window is exceeded or the receiver is in the local
> >>busy state.
> >
> >The check doesn't mean that TX window is exceeded, it means that we have an
> >invalid tx_seq and connection should be closed. I don't see the point in
> >moving the expected check to after this one.
> >About the local busy check. Is it worth to drop expected frames when on local
> >busy? What are the advantages/disadvantages? Drop will trigger SREJ while
> >Store will press the buffer. Do you have measures to prove this?
>
>
> It's possible for expected_tx_seq to be invalid if the tx window is
> full. Therefore it is important to check for an invalid tx_seq
> before checking if it is expected.
>
>
> Dropping frames during local busy is a good idea because:
>
> * Queuing frames during local busy ignores the receive buffer
> limits requested by the application with SO_RCVBUF. This problem
> gets worse with extended window sizes, where the busy_q could be
> quite large
> * ERTM senders are not supposed to send frames during local busy
> anyway
> * The spec allows for packets to be dropped in local busy (look for
> "StoreOrIgnore")
>
> It's the memory that's most important, though. Dropping all incoming
> iframes during local busy is a simple way to keep data buffer size
> bounded while still allowing for larger tx windows.
>
> If the application can't keep up with the data rate and is
> triggering local busy, throughput isn't going to be improved
> significatnly by queuing those frames during local busy.
You convinced me, applied! Thanks.
Gustavo
Hi Gustavo -
On Thu, 30 Jun 2011, Gustavo F. Padovan wrote:
> Hi Mat,
>
> * Mat Martineau <[email protected]> [2011-06-29 14:35:23 -0700]:
>
>> This change moves most L2CAP ERTM receive buffer handling out of the
>> L2CAP core and in to the socket code. It's up to the higher layer
>> (the socket code, in this case) to tell the core when its buffer is
>> full or has space available. The recv op should always accept
>> incoming ERTM data or else the connection will go down.
>>
>> Within the socket layer, an skb that does not fit in the socket
>> receive buffer will be temporarily stored. When the socket is read
>> from, that skb will be placed in the receive buffer if possible. Once
>> adequate buffer space becomes available, the L2CAP core is informed
>> and the ERTM local busy state is cleared.
>>
>> Receive buffer management for non-ERTM modes is unchanged.
>>
>> Signed-off-by: Mat Martineau <[email protected]>
>> ---
>> include/net/bluetooth/l2cap.h | 3 ++
>> net/bluetooth/l2cap_core.c | 41 ++++++++++++++++---------
>> net/bluetooth/l2cap_sock.c | 67 +++++++++++++++++++++++++++++++++++++++--
>> 3 files changed, 93 insertions(+), 18 deletions(-)
>>
>> diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
>> index 9c18e55..82387c5 100644
>> --- a/include/net/bluetooth/l2cap.h
>> +++ b/include/net/bluetooth/l2cap.h
>> @@ -422,6 +422,8 @@ struct l2cap_conn {
>> struct l2cap_pinfo {
>> struct bt_sock bt;
>> struct l2cap_chan *chan;
>> + struct sk_buff *rx_busy_skb;
>> + int rx_busy;
>> };
>>
>> enum {
>> @@ -498,5 +500,6 @@ void l2cap_chan_close(struct l2cap_chan *chan, int reason);
>> void l2cap_chan_destroy(struct l2cap_chan *chan);
>> int l2cap_chan_connect(struct l2cap_chan *chan);
>> int l2cap_chan_send(struct l2cap_chan *chan, struct msghdr *msg, size_t len);
>> +void l2cap_chan_busy(struct l2cap_chan *chan, int busy);
>>
>> #endif /* __L2CAP_H */
>> diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
>> index 4b764b1..f0e7ba7 100644
>> --- a/net/bluetooth/l2cap_core.c
>> +++ b/net/bluetooth/l2cap_core.c
>> @@ -3350,21 +3350,21 @@ static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 c
>> }
>>
>> err = l2cap_ertm_reassembly_sdu(chan, skb, control);
>> - if (err >= 0) {
>> - chan->buffer_seq = (chan->buffer_seq + 1) % 64;
>> - return err;
>> - }
>> -
>> - l2cap_ertm_enter_local_busy(chan);
>> -
>> - bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
>> - __skb_queue_tail(&chan->busy_q, skb);
>> -
>> - queue_work(_busy_wq, &chan->busy_work);
>> + chan->buffer_seq = (chan->buffer_seq + 1) % 64;
>>
>> return err;
>> }
>>
>> +void l2cap_chan_busy(struct l2cap_chan *chan, int busy)
>> +{
>> + if (chan->mode == L2CAP_MODE_ERTM) {
>> + if (busy)
>> + l2cap_ertm_enter_local_busy(chan);
>> + else
>> + l2cap_ertm_exit_local_busy(chan);
>> + }
>> +}
>> +
>> static int l2cap_streaming_reassembly_sdu(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
>> {
>> struct sk_buff *_skb;
>> @@ -3463,13 +3463,22 @@ static void l2cap_check_srej_gap(struct l2cap_chan *chan, u8 tx_seq)
>> struct sk_buff *skb;
>> u16 control;
>>
>> - while ((skb = skb_peek(&chan->srej_q))) {
>> + while ((skb = skb_peek(&chan->srej_q)) &&
>> + !test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
>> + int err;
>> +
>> if (bt_cb(skb)->tx_seq != tx_seq)
>> break;
>>
>> skb = skb_dequeue(&chan->srej_q);
>> control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
>> - l2cap_ertm_reassembly_sdu(chan, skb, control);
>> + err = l2cap_ertm_reassembly_sdu(chan, skb, control);
>> +
>> + if (err < 0) {
>> + l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
>> + break;
>> + }
>> +
>> chan->buffer_seq_srej =
>> (chan->buffer_seq_srej + 1) % 64;
>> tx_seq = (tx_seq + 1) % 64;
>> @@ -3625,8 +3634,10 @@ expected:
>> }
>>
>> err = l2cap_push_rx_skb(chan, skb, rx_control);
>> - if (err < 0)
>> - return 0;
>> + if (err < 0) {
>> + l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
>> + return err;
>> + }
>>
>> if (rx_control & L2CAP_CTRL_FINAL) {
>> if (!test_and_clear_bit(CONN_REJ_ACT, &chan->conn_state))
>> diff --git a/net/bluetooth/l2cap_sock.c b/net/bluetooth/l2cap_sock.c
>> index 39082d4..ab0494b 100644
>> --- a/net/bluetooth/l2cap_sock.c
>> +++ b/net/bluetooth/l2cap_sock.c
>> @@ -711,6 +711,8 @@ static int l2cap_sock_sendmsg(struct kiocb *iocb, struct socket *sock, struct ms
>> static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t len, int flags)
>> {
>> struct sock *sk = sock->sk;
>> + struct l2cap_pinfo *pi = l2cap_pi(sk);
>> + int err;
>>
>> lock_sock(sk);
>>
>> @@ -725,9 +727,40 @@ static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct ms
>> release_sock(sk);
>>
>> if (sock->type == SOCK_STREAM)
>> - return bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
>> + err = bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
>> + else
>> + err = bt_sock_recvmsg(iocb, sock, msg, len, flags);
>> +
>> + if (pi->chan->mode == L2CAP_MODE_ERTM) {
>> + int queue_err;
>> + int threshold;
>> +
>> + /* Attempt to put pending rx data in the socket buffer */
>> +
>> + lock_sock(sk);
>> + if (pi->rx_busy_skb) {
>> + queue_err = sock_queue_rcv_skb(sk, pi->rx_busy_skb);
>> +
>> + if (!queue_err)
>> + pi->rx_busy_skb = NULL;
>> + }
>> +
>> + /* Restore data flow when half of the receive buffer is
>> + * available. This avoids resending large numbers of
>> + * frames.
>> + */
>> + threshold = sk->sk_rcvbuf >> 1;
>> + if (pi->rx_busy && !pi->rx_busy_skb &&
>> + atomic_read(&sk->sk_rmem_alloc) <= threshold) {
>> +
>> + pi->rx_busy = 0;
>> + l2cap_chan_busy(l2cap_pi(sk)->chan, pi->rx_busy);
>> + }
>> +
>> + release_sock(sk);
>
> This is to much core logic outside of the core. I didn't think hard on this
> but this can be simplified to just read the threshold and report core if we
> are not busy anymore, and then get rid of rx_busy and rx_busy_skb.
The main purpose of this patch is to change the behavior of
l2cap_ops->recv() to better match what ERTM needs.
The way sock_queue_rcv_skb() works is not a good match for ERTM.
ERTM can't put data in to the buffer until after it is reassembled, so
when sock_queue_rcv_skb() tells the core there's no room, the core
needs to keep track of the skb that didn't fit.
The core code is simpler if we have a recv_cb function that tells the
core it is busy *before* it will start rejecting skbs. Then the core
does not have to keep track of reassembled and acked skbs. The core
knows that it gave the skb to the socket (or AMP manager) and does not
have to worry about it any more.
This patch hides socket-specific behavior in socket-specific code,
which seems like a good fit. The core should not have to care about
socket-specific behavior.
It also delays exit of the local busy state until there is receive
buffer space available. This results in fewer transitions in & out of
local busy.
I could get rid of rx_busy by querying the LOCAL_BUSY bit from
the core.
>
>> + }
>>
>> - return bt_sock_recvmsg(iocb, sock, msg, len, flags);
>> + return err;
>> }
>>
>> /* Kill socket (only if zapped and orphan)
>> @@ -811,9 +844,32 @@ static struct l2cap_chan *l2cap_sock_new_connection_cb(void *data)
>>
>> static int l2cap_sock_recv_cb(void *data, struct sk_buff *skb)
>> {
>> + int err;
>> struct sock *sk = data;
>> + struct l2cap_pinfo *pi = l2cap_pi(sk);
>> +
>> + if (pi->rx_busy_skb)
>> + return -ENOMEM;
>> +
>> + err = sock_queue_rcv_skb(sk, skb);
>> +
>> + /* For ERTM, handle one skb that doesn't fit into the recv
>> + * buffer. This is important to do because the data frames
>> + * have already been acked, so the skb cannot be discarded.
>> + *
>> + * Notify the l2cap core that the buffer is full, so the
>> + * LOCAL_BUSY state is entered and no more frames are
>> + * acked and reassembled until there is buffer space
>> + * available.
>> + */
>> + if (err < 0 && pi->chan->mode == L2CAP_MODE_ERTM) {
>> + pi->rx_busy = 1;
>> + pi->rx_busy_skb = skb;
>> + l2cap_chan_busy(pi->chan, pi->rx_busy);
>> + err = 0;
>> + }
>
> This can totally be in l2cap_core.c. The idea is move Core stuff to
> l2cap_core.c and not let the L2CAP users handle this. Just enter in LOCAL_BUSY
> when recv_cb returns returns less than 0.
I see the core's responsibility as handling the ERTM protocol - it
receives, reassembles, and acks the data. When it has finished that
job, and has no indication that it should be "busy", it passes that
data to the higher layer (socket or AMP manager) and is done with it.
This means the socket layer must keep the one skb that didn't fit in
the receive buffer, and it puts that data in the socket rcvbuf in a
way that is invisible to the core.
This changes the local busy behavior from:
* skb arrives, core reassembles it
* Core calls socket recv_cb, error returned
* Core puts 'end' PDU or 'unsegmented' SDU on the busy_q
* Core sends RNR
* Core polls socket recv_cb [up to 12 calls from core to socket]
* Reassembly code is executed each time (but data not re-copied)
* Core sends RR though there may be very little space in the socket
recv buffer
and instead does:
* skb arrives, core reassembles it exactly once
* Core calls socket recv_cb
* Socket tells the core it is busy
* Core sends RNR
* Application calls recvmsg() until buffer is half full (no core
interaction) [no limit other than socket timeout]
* Socket tells the core it is not busy
* Core sends RR, and there is socket buffer space to accept incoming
data
The new way has no iteration in the core, and all of the work that
happens during a recvmsg() stays in the socket code until there is
buffer space available.
The change is also motiviated by trying to remove the busy workqueue,
which is a good thing to do for several reasons:
* Gets rid of an extra thread and associated scheduling and memory
* The current implementation of l2cap_busy_work() does not operate
well with multiple sockets (the workqueue thread is blocked until
l2cap_busy_work() returns - so no)
* The socket layer knows when the receive buffer was read from, so it
makes sense to only attempt reassembly after recvmsg and not poll
every 200ms.
* Data gets to the socket layer faster if there's no wait for the
next polling attempt.
* Channel is disconnected after a couple of seconds, which isn't
necessary and may break some apps
As I commented on patch 1 of this set, I'd also like to discard
packets during local busy and get rid of the busy_q (to limit memory
usage). Since the core currently places skbs on the busy_q when
recv_cb returns an error, there needs to be a different place for the
skb to go when sock_queue_rcv_skb() returns an error. The socket code
seemed like a good place for this, as I explained above.
--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum