2011-07-06 21:54:29

by Mat Martineau

[permalink] [raw]
Subject: [PATCH v2 0/3] ERTM local busy enhancement

These patches change receiver-side handling of flow control. This new
approach will not keep very much incoming data beyond what fits in the
socket receive buffer, so memory use is more tightly controlled. When
incoming data does not immediately fit in the socket buffer, the L2CAP
socket layer will now only check for available space when the socket
is read from instead of polling on a workqueue thread.


Mat Martineau (3):
Bluetooth: Move code for ERTM local busy state to separate functions
Bluetooth: Use event-driven approach for handling ERTM receive buffer
Bluetooth: Remove L2CAP busy queue

include/net/bluetooth/l2cap.h | 6 +-
net/bluetooth/l2cap_core.c | 174 +++++++++++------------------------------
net/bluetooth/l2cap_sock.c | 66 +++++++++++++++-
3 files changed, 109 insertions(+), 137 deletions(-)

--
1.7.6

--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum


2011-07-07 15:43:40

by Mat Martineau

[permalink] [raw]
Subject: Re: [PATCH v2 2/3] Bluetooth: Use event-driven approach for handling ERTM receive buffer


Hi Gustavo -

On Wed, 6 Jul 2011, Gustavo Padovan wrote:

> Hi Mat,
>
> * Mat Martineau <[email protected]> [2011-07-06 16:01:43 -0700]:
>
>>
>> Hi Gustavo -
>>
>> On Wed, 6 Jul 2011, Gustavo Padovan wrote:
>>
>>> Hi Mat,
>>>
>>> * Mat Martineau <[email protected]> [2011-07-06 14:54:31 -0700]:
>>>
>>>> This change moves most L2CAP ERTM receive buffer handling out of the
>>>> L2CAP core and in to the socket code. It's up to the higher layer
>>>> (the socket code, in this case) to tell the core when its buffer is
>>>> full or has space available. The recv op should always accept
>>>> incoming ERTM data or else the connection will go down.
>>>>
>>>> Within the socket layer, an skb that does not fit in the socket
>>>> receive buffer will be temporarily stored. When the socket is read
>>>> from, that skb will be placed in the receive buffer if possible. Once
>>>> adequate buffer space becomes available, the L2CAP core is informed
>>>> and the ERTM local busy state is cleared.
>>>>
>>>> Receive buffer management for non-ERTM modes is unchanged.
>>>>
>>>> Signed-off-by: Mat Martineau <[email protected]>
>>>> ---
>>>> include/net/bluetooth/l2cap.h | 2 +
>>>> net/bluetooth/l2cap_core.c | 41 ++++++++++++++++---------
>>>> net/bluetooth/l2cap_sock.c | 66 ++++++++++++++++++++++++++++++++++++++--
>>>> 3 files changed, 90 insertions(+), 19 deletions(-)
>>>>
>>>> diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
>>>> index 9c18e55..66b8d96 100644
>>>> --- a/include/net/bluetooth/l2cap.h
>>>> +++ b/include/net/bluetooth/l2cap.h
>>>> @@ -422,6 +422,7 @@ struct l2cap_conn {
>>>> struct l2cap_pinfo {
>>>> struct bt_sock bt;
>>>> struct l2cap_chan *chan;
>>>> + struct sk_buff *rx_busy_skb;
>>>> };
>>>>
>>>> enum {
>>>> @@ -498,5 +499,6 @@ void l2cap_chan_close(struct l2cap_chan *chan, int reason);
>>>> void l2cap_chan_destroy(struct l2cap_chan *chan);
>>>> int l2cap_chan_connect(struct l2cap_chan *chan);
>>>> int l2cap_chan_send(struct l2cap_chan *chan, struct msghdr *msg, size_t len);
>>>> +void l2cap_chan_busy(struct l2cap_chan *chan, int busy);
>>>>
>>>> #endif /* __L2CAP_H */
>>>> diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
>>>> index f7ada4a..ea9c7d0 100644
>>>> --- a/net/bluetooth/l2cap_core.c
>>>> +++ b/net/bluetooth/l2cap_core.c
>>>> @@ -3350,21 +3350,21 @@ static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 c
>>>> }
>>>>
>>>> err = l2cap_ertm_reassembly_sdu(chan, skb, control);
>>>> - if (err >= 0) {
>>>> - chan->buffer_seq = (chan->buffer_seq + 1) % 64;
>>>> - return err;
>>>> - }
>>>> -
>>>> - l2cap_ertm_enter_local_busy(chan);
>>>> -
>>>> - bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
>>>> - __skb_queue_tail(&chan->busy_q, skb);
>>>> -
>>>> - queue_work(_busy_wq, &chan->busy_work);
>>>> + chan->buffer_seq = (chan->buffer_seq + 1) % 64;
>>>>
>>>> return err;
>>>> }
>>>>
>>>> +void l2cap_chan_busy(struct l2cap_chan *chan, int busy)
>>>> +{
>>>> + if (chan->mode == L2CAP_MODE_ERTM) {
>>>> + if (busy)
>>>> + l2cap_ertm_enter_local_busy(chan);
>>>> + else
>>>> + l2cap_ertm_exit_local_busy(chan);
>>>> + }
>>>> +}
>>>> +
>>>> static int l2cap_streaming_reassembly_sdu(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
>>>> {
>>>> struct sk_buff *_skb;
>>>> @@ -3463,13 +3463,22 @@ static void l2cap_check_srej_gap(struct l2cap_chan *chan, u8 tx_seq)
>>>> struct sk_buff *skb;
>>>> u16 control;
>>>>
>>>> - while ((skb = skb_peek(&chan->srej_q))) {
>>>> + while ((skb = skb_peek(&chan->srej_q)) &&
>>>> + !test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
>>>> + int err;
>>>> +
>>>> if (bt_cb(skb)->tx_seq != tx_seq)
>>>> break;
>>>>
>>>> skb = skb_dequeue(&chan->srej_q);
>>>> control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
>>>> - l2cap_ertm_reassembly_sdu(chan, skb, control);
>>>> + err = l2cap_ertm_reassembly_sdu(chan, skb, control);
>>>> +
>>>> + if (err < 0) {
>>>> + l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
>>>> + break;
>>>> + }
>>>> +
>>>> chan->buffer_seq_srej =
>>>> (chan->buffer_seq_srej + 1) % 64;
>>>> tx_seq = (tx_seq + 1) % 64;
>>>> @@ -3625,8 +3634,10 @@ expected:
>>>> }
>>>>
>>>> err = l2cap_push_rx_skb(chan, skb, rx_control);
>>>> - if (err < 0)
>>>> - return 0;
>>>> + if (err < 0) {
>>>> + l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
>>>> + return err;
>>>> + }
>>>>
>>>> if (rx_control & L2CAP_CTRL_FINAL) {
>>>> if (!test_and_clear_bit(CONN_REJ_ACT, &chan->conn_state))
>>>> diff --git a/net/bluetooth/l2cap_sock.c b/net/bluetooth/l2cap_sock.c
>>>> index 39082d4..9bf7313 100644
>>>> --- a/net/bluetooth/l2cap_sock.c
>>>> +++ b/net/bluetooth/l2cap_sock.c
>>>> @@ -711,13 +711,15 @@ static int l2cap_sock_sendmsg(struct kiocb *iocb, struct socket *sock, struct ms
>>>> static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t len, int flags)
>>>> {
>>>> struct sock *sk = sock->sk;
>>>> + struct l2cap_pinfo *pi = l2cap_pi(sk);
>>>> + int err;
>>>>
>>>> lock_sock(sk);
>>>>
>>>> if (sk->sk_state == BT_CONNECT2 && bt_sk(sk)->defer_setup) {
>>>> sk->sk_state = BT_CONFIG;
>>>>
>>>> - __l2cap_connect_rsp_defer(l2cap_pi(sk)->chan);
>>>> + __l2cap_connect_rsp_defer(pi->chan);
>>>> release_sock(sk);
>>>> return 0;
>>>> }
>>>> @@ -725,9 +727,38 @@ static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct ms
>>>> release_sock(sk);
>>>>
>>>> if (sock->type == SOCK_STREAM)
>>>> - return bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
>>>> + err = bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
>>>> + else
>>>> + err = bt_sock_recvmsg(iocb, sock, msg, len, flags);
>>>> +
>>>> + if (pi->chan->mode == L2CAP_MODE_ERTM) {
>>>
>>> if (pi->chan->mode != L2CAP_MODE_ERTM) {
>>> return err;
>>>
>>> lock_sock()...
>>
>> Ok.
>>
>>>> + int threshold;
>>>> +
>>>> + /* Attempt to put pending rx data in the socket buffer */
>>>> +
>>>> + lock_sock(sk);
>>>> + if (pi->rx_busy_skb) {
>>>> + int queue_err;
>>>> + queue_err = sock_queue_rcv_skb(sk, pi->rx_busy_skb);
>>>> +
>>>> + if (!queue_err)
>>>> + pi->rx_busy_skb = NULL;
>>>
>>> if (!sock_queue_rcv_skb())
>>> pi->rx_busy_skb = NULL;
>>> else
>>> return err;
>>>
>>> What I understood is that if sock_queue_rcv_skb() fails there is no need to
>>> check if there is available space in the buffer, then we just return. The
>>> locks also needs to be released.
>>
>> You're right - it's better to make sure we don't exit local busy in
>> this case.
>>
>>>> + }
>>>> +
>>>> + /* Restore data flow when half of the receive buffer is
>>>> + * available. This avoids resending large numbers of
>>>> + * frames.
>>>> + */
>>>> + threshold = sk->sk_rcvbuf >> 1;
>>>> + if (test_bit(CONN_LOCAL_BUSY, &pi->chan->conn_state) &&
>>>> + !pi->rx_busy_skb &&
>>>> + atomic_read(&sk->sk_rmem_alloc) <= threshold)
>>>> + l2cap_chan_busy(pi->chan, 0);
>>>
>>> Then this if can be just
>>>
>>> if (atomic_read(sk_rmem_alloc) <= sk_rcvbuf >> 1)
>>> l2cap_chan_busy(pi->chan, 0);
>>>
>>
>> It's still important to see if the channel is currently LOCAL_BUSY,
>> this code path will be executed every time an ERTM socket is read
>> from (busy or not).
>
> what about this in beginning?
>
> if (pi->chan->mode != L2CAP_MODE_ERTM || !LOCAL_BUSY) {
>
> Then we avoid all the rest of the code if we are in Local Busy.

The lock needs to be held to check the local busy bit. I can check
LOCAL_BUSY after the lock, and release/return early if not busy.


--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum

2011-07-06 23:43:51

by Gustavo Padovan

[permalink] [raw]
Subject: Re: [PATCH v2 2/3] Bluetooth: Use event-driven approach for handling ERTM receive buffer

Hi Mat,

* Mat Martineau <[email protected]> [2011-07-06 16:01:43 -0700]:

>
> Hi Gustavo -
>
> On Wed, 6 Jul 2011, Gustavo Padovan wrote:
>
> >Hi Mat,
> >
> >* Mat Martineau <[email protected]> [2011-07-06 14:54:31 -0700]:
> >
> >>This change moves most L2CAP ERTM receive buffer handling out of the
> >>L2CAP core and in to the socket code. It's up to the higher layer
> >>(the socket code, in this case) to tell the core when its buffer is
> >>full or has space available. The recv op should always accept
> >>incoming ERTM data or else the connection will go down.
> >>
> >>Within the socket layer, an skb that does not fit in the socket
> >>receive buffer will be temporarily stored. When the socket is read
> >>from, that skb will be placed in the receive buffer if possible. Once
> >>adequate buffer space becomes available, the L2CAP core is informed
> >>and the ERTM local busy state is cleared.
> >>
> >>Receive buffer management for non-ERTM modes is unchanged.
> >>
> >>Signed-off-by: Mat Martineau <[email protected]>
> >>---
> >> include/net/bluetooth/l2cap.h | 2 +
> >> net/bluetooth/l2cap_core.c | 41 ++++++++++++++++---------
> >> net/bluetooth/l2cap_sock.c | 66 ++++++++++++++++++++++++++++++++++++++--
> >> 3 files changed, 90 insertions(+), 19 deletions(-)
> >>
> >>diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
> >>index 9c18e55..66b8d96 100644
> >>--- a/include/net/bluetooth/l2cap.h
> >>+++ b/include/net/bluetooth/l2cap.h
> >>@@ -422,6 +422,7 @@ struct l2cap_conn {
> >> struct l2cap_pinfo {
> >> struct bt_sock bt;
> >> struct l2cap_chan *chan;
> >>+ struct sk_buff *rx_busy_skb;
> >> };
> >>
> >> enum {
> >>@@ -498,5 +499,6 @@ void l2cap_chan_close(struct l2cap_chan *chan, int reason);
> >> void l2cap_chan_destroy(struct l2cap_chan *chan);
> >> int l2cap_chan_connect(struct l2cap_chan *chan);
> >> int l2cap_chan_send(struct l2cap_chan *chan, struct msghdr *msg, size_t len);
> >>+void l2cap_chan_busy(struct l2cap_chan *chan, int busy);
> >>
> >> #endif /* __L2CAP_H */
> >>diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
> >>index f7ada4a..ea9c7d0 100644
> >>--- a/net/bluetooth/l2cap_core.c
> >>+++ b/net/bluetooth/l2cap_core.c
> >>@@ -3350,21 +3350,21 @@ static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 c
> >> }
> >>
> >> err = l2cap_ertm_reassembly_sdu(chan, skb, control);
> >>- if (err >= 0) {
> >>- chan->buffer_seq = (chan->buffer_seq + 1) % 64;
> >>- return err;
> >>- }
> >>-
> >>- l2cap_ertm_enter_local_busy(chan);
> >>-
> >>- bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
> >>- __skb_queue_tail(&chan->busy_q, skb);
> >>-
> >>- queue_work(_busy_wq, &chan->busy_work);
> >>+ chan->buffer_seq = (chan->buffer_seq + 1) % 64;
> >>
> >> return err;
> >> }
> >>
> >>+void l2cap_chan_busy(struct l2cap_chan *chan, int busy)
> >>+{
> >>+ if (chan->mode == L2CAP_MODE_ERTM) {
> >>+ if (busy)
> >>+ l2cap_ertm_enter_local_busy(chan);
> >>+ else
> >>+ l2cap_ertm_exit_local_busy(chan);
> >>+ }
> >>+}
> >>+
> >> static int l2cap_streaming_reassembly_sdu(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
> >> {
> >> struct sk_buff *_skb;
> >>@@ -3463,13 +3463,22 @@ static void l2cap_check_srej_gap(struct l2cap_chan *chan, u8 tx_seq)
> >> struct sk_buff *skb;
> >> u16 control;
> >>
> >>- while ((skb = skb_peek(&chan->srej_q))) {
> >>+ while ((skb = skb_peek(&chan->srej_q)) &&
> >>+ !test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
> >>+ int err;
> >>+
> >> if (bt_cb(skb)->tx_seq != tx_seq)
> >> break;
> >>
> >> skb = skb_dequeue(&chan->srej_q);
> >> control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
> >>- l2cap_ertm_reassembly_sdu(chan, skb, control);
> >>+ err = l2cap_ertm_reassembly_sdu(chan, skb, control);
> >>+
> >>+ if (err < 0) {
> >>+ l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
> >>+ break;
> >>+ }
> >>+
> >> chan->buffer_seq_srej =
> >> (chan->buffer_seq_srej + 1) % 64;
> >> tx_seq = (tx_seq + 1) % 64;
> >>@@ -3625,8 +3634,10 @@ expected:
> >> }
> >>
> >> err = l2cap_push_rx_skb(chan, skb, rx_control);
> >>- if (err < 0)
> >>- return 0;
> >>+ if (err < 0) {
> >>+ l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
> >>+ return err;
> >>+ }
> >>
> >> if (rx_control & L2CAP_CTRL_FINAL) {
> >> if (!test_and_clear_bit(CONN_REJ_ACT, &chan->conn_state))
> >>diff --git a/net/bluetooth/l2cap_sock.c b/net/bluetooth/l2cap_sock.c
> >>index 39082d4..9bf7313 100644
> >>--- a/net/bluetooth/l2cap_sock.c
> >>+++ b/net/bluetooth/l2cap_sock.c
> >>@@ -711,13 +711,15 @@ static int l2cap_sock_sendmsg(struct kiocb *iocb, struct socket *sock, struct ms
> >> static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t len, int flags)
> >> {
> >> struct sock *sk = sock->sk;
> >>+ struct l2cap_pinfo *pi = l2cap_pi(sk);
> >>+ int err;
> >>
> >> lock_sock(sk);
> >>
> >> if (sk->sk_state == BT_CONNECT2 && bt_sk(sk)->defer_setup) {
> >> sk->sk_state = BT_CONFIG;
> >>
> >>- __l2cap_connect_rsp_defer(l2cap_pi(sk)->chan);
> >>+ __l2cap_connect_rsp_defer(pi->chan);
> >> release_sock(sk);
> >> return 0;
> >> }
> >>@@ -725,9 +727,38 @@ static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct ms
> >> release_sock(sk);
> >>
> >> if (sock->type == SOCK_STREAM)
> >>- return bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
> >>+ err = bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
> >>+ else
> >>+ err = bt_sock_recvmsg(iocb, sock, msg, len, flags);
> >>+
> >>+ if (pi->chan->mode == L2CAP_MODE_ERTM) {
> >
> > if (pi->chan->mode != L2CAP_MODE_ERTM) {
> > return err;
> >
> > lock_sock()...
>
> Ok.
>
> >>+ int threshold;
> >>+
> >>+ /* Attempt to put pending rx data in the socket buffer */
> >>+
> >>+ lock_sock(sk);
> >>+ if (pi->rx_busy_skb) {
> >>+ int queue_err;
> >>+ queue_err = sock_queue_rcv_skb(sk, pi->rx_busy_skb);
> >>+
> >>+ if (!queue_err)
> >>+ pi->rx_busy_skb = NULL;
> >
> > if (!sock_queue_rcv_skb())
> > pi->rx_busy_skb = NULL;
> > else
> > return err;
> >
> >What I understood is that if sock_queue_rcv_skb() fails there is no need to
> >check if there is available space in the buffer, then we just return. The
> >locks also needs to be released.
>
> You're right - it's better to make sure we don't exit local busy in
> this case.
>
> >>+ }
> >>+
> >>+ /* Restore data flow when half of the receive buffer is
> >>+ * available. This avoids resending large numbers of
> >>+ * frames.
> >>+ */
> >>+ threshold = sk->sk_rcvbuf >> 1;
> >>+ if (test_bit(CONN_LOCAL_BUSY, &pi->chan->conn_state) &&
> >>+ !pi->rx_busy_skb &&
> >>+ atomic_read(&sk->sk_rmem_alloc) <= threshold)
> >>+ l2cap_chan_busy(pi->chan, 0);
> >
> >Then this if can be just
> >
> > if (atomic_read(sk_rmem_alloc) <= sk_rcvbuf >> 1)
> > l2cap_chan_busy(pi->chan, 0);
> >
>
> It's still important to see if the channel is currently LOCAL_BUSY,
> this code path will be executed every time an ERTM socket is read
> from (busy or not).

what about this in beginning?

if (pi->chan->mode != L2CAP_MODE_ERTM || !LOCAL_BUSY) {

Then we avoid all the rest of the code if we are in Local Busy.

Gustavo

2011-07-06 23:01:43

by Mat Martineau

[permalink] [raw]
Subject: Re: [PATCH v2 2/3] Bluetooth: Use event-driven approach for handling ERTM receive buffer


Hi Gustavo -

On Wed, 6 Jul 2011, Gustavo Padovan wrote:

> Hi Mat,
>
> * Mat Martineau <[email protected]> [2011-07-06 14:54:31 -0700]:
>
>> This change moves most L2CAP ERTM receive buffer handling out of the
>> L2CAP core and in to the socket code. It's up to the higher layer
>> (the socket code, in this case) to tell the core when its buffer is
>> full or has space available. The recv op should always accept
>> incoming ERTM data or else the connection will go down.
>>
>> Within the socket layer, an skb that does not fit in the socket
>> receive buffer will be temporarily stored. When the socket is read
>> from, that skb will be placed in the receive buffer if possible. Once
>> adequate buffer space becomes available, the L2CAP core is informed
>> and the ERTM local busy state is cleared.
>>
>> Receive buffer management for non-ERTM modes is unchanged.
>>
>> Signed-off-by: Mat Martineau <[email protected]>
>> ---
>> include/net/bluetooth/l2cap.h | 2 +
>> net/bluetooth/l2cap_core.c | 41 ++++++++++++++++---------
>> net/bluetooth/l2cap_sock.c | 66 ++++++++++++++++++++++++++++++++++++++--
>> 3 files changed, 90 insertions(+), 19 deletions(-)
>>
>> diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
>> index 9c18e55..66b8d96 100644
>> --- a/include/net/bluetooth/l2cap.h
>> +++ b/include/net/bluetooth/l2cap.h
>> @@ -422,6 +422,7 @@ struct l2cap_conn {
>> struct l2cap_pinfo {
>> struct bt_sock bt;
>> struct l2cap_chan *chan;
>> + struct sk_buff *rx_busy_skb;
>> };
>>
>> enum {
>> @@ -498,5 +499,6 @@ void l2cap_chan_close(struct l2cap_chan *chan, int reason);
>> void l2cap_chan_destroy(struct l2cap_chan *chan);
>> int l2cap_chan_connect(struct l2cap_chan *chan);
>> int l2cap_chan_send(struct l2cap_chan *chan, struct msghdr *msg, size_t len);
>> +void l2cap_chan_busy(struct l2cap_chan *chan, int busy);
>>
>> #endif /* __L2CAP_H */
>> diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
>> index f7ada4a..ea9c7d0 100644
>> --- a/net/bluetooth/l2cap_core.c
>> +++ b/net/bluetooth/l2cap_core.c
>> @@ -3350,21 +3350,21 @@ static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 c
>> }
>>
>> err = l2cap_ertm_reassembly_sdu(chan, skb, control);
>> - if (err >= 0) {
>> - chan->buffer_seq = (chan->buffer_seq + 1) % 64;
>> - return err;
>> - }
>> -
>> - l2cap_ertm_enter_local_busy(chan);
>> -
>> - bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
>> - __skb_queue_tail(&chan->busy_q, skb);
>> -
>> - queue_work(_busy_wq, &chan->busy_work);
>> + chan->buffer_seq = (chan->buffer_seq + 1) % 64;
>>
>> return err;
>> }
>>
>> +void l2cap_chan_busy(struct l2cap_chan *chan, int busy)
>> +{
>> + if (chan->mode == L2CAP_MODE_ERTM) {
>> + if (busy)
>> + l2cap_ertm_enter_local_busy(chan);
>> + else
>> + l2cap_ertm_exit_local_busy(chan);
>> + }
>> +}
>> +
>> static int l2cap_streaming_reassembly_sdu(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
>> {
>> struct sk_buff *_skb;
>> @@ -3463,13 +3463,22 @@ static void l2cap_check_srej_gap(struct l2cap_chan *chan, u8 tx_seq)
>> struct sk_buff *skb;
>> u16 control;
>>
>> - while ((skb = skb_peek(&chan->srej_q))) {
>> + while ((skb = skb_peek(&chan->srej_q)) &&
>> + !test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
>> + int err;
>> +
>> if (bt_cb(skb)->tx_seq != tx_seq)
>> break;
>>
>> skb = skb_dequeue(&chan->srej_q);
>> control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
>> - l2cap_ertm_reassembly_sdu(chan, skb, control);
>> + err = l2cap_ertm_reassembly_sdu(chan, skb, control);
>> +
>> + if (err < 0) {
>> + l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
>> + break;
>> + }
>> +
>> chan->buffer_seq_srej =
>> (chan->buffer_seq_srej + 1) % 64;
>> tx_seq = (tx_seq + 1) % 64;
>> @@ -3625,8 +3634,10 @@ expected:
>> }
>>
>> err = l2cap_push_rx_skb(chan, skb, rx_control);
>> - if (err < 0)
>> - return 0;
>> + if (err < 0) {
>> + l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
>> + return err;
>> + }
>>
>> if (rx_control & L2CAP_CTRL_FINAL) {
>> if (!test_and_clear_bit(CONN_REJ_ACT, &chan->conn_state))
>> diff --git a/net/bluetooth/l2cap_sock.c b/net/bluetooth/l2cap_sock.c
>> index 39082d4..9bf7313 100644
>> --- a/net/bluetooth/l2cap_sock.c
>> +++ b/net/bluetooth/l2cap_sock.c
>> @@ -711,13 +711,15 @@ static int l2cap_sock_sendmsg(struct kiocb *iocb, struct socket *sock, struct ms
>> static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t len, int flags)
>> {
>> struct sock *sk = sock->sk;
>> + struct l2cap_pinfo *pi = l2cap_pi(sk);
>> + int err;
>>
>> lock_sock(sk);
>>
>> if (sk->sk_state == BT_CONNECT2 && bt_sk(sk)->defer_setup) {
>> sk->sk_state = BT_CONFIG;
>>
>> - __l2cap_connect_rsp_defer(l2cap_pi(sk)->chan);
>> + __l2cap_connect_rsp_defer(pi->chan);
>> release_sock(sk);
>> return 0;
>> }
>> @@ -725,9 +727,38 @@ static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct ms
>> release_sock(sk);
>>
>> if (sock->type == SOCK_STREAM)
>> - return bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
>> + err = bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
>> + else
>> + err = bt_sock_recvmsg(iocb, sock, msg, len, flags);
>> +
>> + if (pi->chan->mode == L2CAP_MODE_ERTM) {
>
> if (pi->chan->mode != L2CAP_MODE_ERTM) {
> return err;
>
> lock_sock()...

Ok.

>> + int threshold;
>> +
>> + /* Attempt to put pending rx data in the socket buffer */
>> +
>> + lock_sock(sk);
>> + if (pi->rx_busy_skb) {
>> + int queue_err;
>> + queue_err = sock_queue_rcv_skb(sk, pi->rx_busy_skb);
>> +
>> + if (!queue_err)
>> + pi->rx_busy_skb = NULL;
>
> if (!sock_queue_rcv_skb())
> pi->rx_busy_skb = NULL;
> else
> return err;
>
> What I understood is that if sock_queue_rcv_skb() fails there is no need to
> check if there is available space in the buffer, then we just return. The
> locks also needs to be released.

You're right - it's better to make sure we don't exit local busy in
this case.

>> + }
>> +
>> + /* Restore data flow when half of the receive buffer is
>> + * available. This avoids resending large numbers of
>> + * frames.
>> + */
>> + threshold = sk->sk_rcvbuf >> 1;
>> + if (test_bit(CONN_LOCAL_BUSY, &pi->chan->conn_state) &&
>> + !pi->rx_busy_skb &&
>> + atomic_read(&sk->sk_rmem_alloc) <= threshold)
>> + l2cap_chan_busy(pi->chan, 0);
>
> Then this if can be just
>
> if (atomic_read(sk_rmem_alloc) <= sk_rcvbuf >> 1)
> l2cap_chan_busy(pi->chan, 0);
>

It's still important to see if the channel is currently LOCAL_BUSY,
this code path will be executed every time an ERTM socket is read
from (busy or not).

>> +
>> + release_sock(sk);
>> + }
>>
>> - return bt_sock_recvmsg(iocb, sock, msg, len, flags);
>> + return err;
>> }
>>
>> /* Kill socket (only if zapped and orphan)
>> @@ -811,9 +842,31 @@ static struct l2cap_chan *l2cap_sock_new_connection_cb(void *data)
>>
>> static int l2cap_sock_recv_cb(void *data, struct sk_buff *skb)
>> {
>> + int err;
>> struct sock *sk = data;
>> + struct l2cap_pinfo *pi = l2cap_pi(sk);
>> +
>> + if (pi->rx_busy_skb)
>> + return -ENOMEM;
>>
>> - return sock_queue_rcv_skb(sk, skb);
>> + err = sock_queue_rcv_skb(sk, skb);
>> +
>> + /* For ERTM, handle one skb that doesn't fit into the recv
>> + * buffer. This is important to do because the data frames
>> + * have already been acked, so the skb cannot be discarded.
>> + *
>> + * Notify the l2cap core that the buffer is full, so the
>> + * LOCAL_BUSY state is entered and no more frames are
>> + * acked and reassembled until there is buffer space
>> + * available.
>> + */
>> + if (err < 0 && pi->chan->mode == L2CAP_MODE_ERTM) {
>> + pi->rx_busy_skb = skb;
>> + l2cap_chan_busy(pi->chan, 1);
>> + err = 0;
>> + }
>> +
>> + return err;
>
> You always return 0 here, remove err;

It might not be 0 in basic mode or streaming mode, or raw. In these
cases, the error code needs to be returned so the skb is freed.


Thanks for the feedback, I'll get to work on v3.

--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum

2011-07-06 22:36:35

by Gustavo Padovan

[permalink] [raw]
Subject: Re: [PATCH v2 2/3] Bluetooth: Use event-driven approach for handling ERTM receive buffer

Hi Mat,

* Mat Martineau <[email protected]> [2011-07-06 14:54:31 -0700]:

> This change moves most L2CAP ERTM receive buffer handling out of the
> L2CAP core and in to the socket code. It's up to the higher layer
> (the socket code, in this case) to tell the core when its buffer is
> full or has space available. The recv op should always accept
> incoming ERTM data or else the connection will go down.
>
> Within the socket layer, an skb that does not fit in the socket
> receive buffer will be temporarily stored. When the socket is read
> from, that skb will be placed in the receive buffer if possible. Once
> adequate buffer space becomes available, the L2CAP core is informed
> and the ERTM local busy state is cleared.
>
> Receive buffer management for non-ERTM modes is unchanged.
>
> Signed-off-by: Mat Martineau <[email protected]>
> ---
> include/net/bluetooth/l2cap.h | 2 +
> net/bluetooth/l2cap_core.c | 41 ++++++++++++++++---------
> net/bluetooth/l2cap_sock.c | 66 ++++++++++++++++++++++++++++++++++++++--
> 3 files changed, 90 insertions(+), 19 deletions(-)
>
> diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
> index 9c18e55..66b8d96 100644
> --- a/include/net/bluetooth/l2cap.h
> +++ b/include/net/bluetooth/l2cap.h
> @@ -422,6 +422,7 @@ struct l2cap_conn {
> struct l2cap_pinfo {
> struct bt_sock bt;
> struct l2cap_chan *chan;
> + struct sk_buff *rx_busy_skb;
> };
>
> enum {
> @@ -498,5 +499,6 @@ void l2cap_chan_close(struct l2cap_chan *chan, int reason);
> void l2cap_chan_destroy(struct l2cap_chan *chan);
> int l2cap_chan_connect(struct l2cap_chan *chan);
> int l2cap_chan_send(struct l2cap_chan *chan, struct msghdr *msg, size_t len);
> +void l2cap_chan_busy(struct l2cap_chan *chan, int busy);
>
> #endif /* __L2CAP_H */
> diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
> index f7ada4a..ea9c7d0 100644
> --- a/net/bluetooth/l2cap_core.c
> +++ b/net/bluetooth/l2cap_core.c
> @@ -3350,21 +3350,21 @@ static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 c
> }
>
> err = l2cap_ertm_reassembly_sdu(chan, skb, control);
> - if (err >= 0) {
> - chan->buffer_seq = (chan->buffer_seq + 1) % 64;
> - return err;
> - }
> -
> - l2cap_ertm_enter_local_busy(chan);
> -
> - bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
> - __skb_queue_tail(&chan->busy_q, skb);
> -
> - queue_work(_busy_wq, &chan->busy_work);
> + chan->buffer_seq = (chan->buffer_seq + 1) % 64;
>
> return err;
> }
>
> +void l2cap_chan_busy(struct l2cap_chan *chan, int busy)
> +{
> + if (chan->mode == L2CAP_MODE_ERTM) {
> + if (busy)
> + l2cap_ertm_enter_local_busy(chan);
> + else
> + l2cap_ertm_exit_local_busy(chan);
> + }
> +}
> +
> static int l2cap_streaming_reassembly_sdu(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
> {
> struct sk_buff *_skb;
> @@ -3463,13 +3463,22 @@ static void l2cap_check_srej_gap(struct l2cap_chan *chan, u8 tx_seq)
> struct sk_buff *skb;
> u16 control;
>
> - while ((skb = skb_peek(&chan->srej_q))) {
> + while ((skb = skb_peek(&chan->srej_q)) &&
> + !test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
> + int err;
> +
> if (bt_cb(skb)->tx_seq != tx_seq)
> break;
>
> skb = skb_dequeue(&chan->srej_q);
> control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
> - l2cap_ertm_reassembly_sdu(chan, skb, control);
> + err = l2cap_ertm_reassembly_sdu(chan, skb, control);
> +
> + if (err < 0) {
> + l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
> + break;
> + }
> +
> chan->buffer_seq_srej =
> (chan->buffer_seq_srej + 1) % 64;
> tx_seq = (tx_seq + 1) % 64;
> @@ -3625,8 +3634,10 @@ expected:
> }
>
> err = l2cap_push_rx_skb(chan, skb, rx_control);
> - if (err < 0)
> - return 0;
> + if (err < 0) {
> + l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
> + return err;
> + }
>
> if (rx_control & L2CAP_CTRL_FINAL) {
> if (!test_and_clear_bit(CONN_REJ_ACT, &chan->conn_state))
> diff --git a/net/bluetooth/l2cap_sock.c b/net/bluetooth/l2cap_sock.c
> index 39082d4..9bf7313 100644
> --- a/net/bluetooth/l2cap_sock.c
> +++ b/net/bluetooth/l2cap_sock.c
> @@ -711,13 +711,15 @@ static int l2cap_sock_sendmsg(struct kiocb *iocb, struct socket *sock, struct ms
> static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t len, int flags)
> {
> struct sock *sk = sock->sk;
> + struct l2cap_pinfo *pi = l2cap_pi(sk);
> + int err;
>
> lock_sock(sk);
>
> if (sk->sk_state == BT_CONNECT2 && bt_sk(sk)->defer_setup) {
> sk->sk_state = BT_CONFIG;
>
> - __l2cap_connect_rsp_defer(l2cap_pi(sk)->chan);
> + __l2cap_connect_rsp_defer(pi->chan);
> release_sock(sk);
> return 0;
> }
> @@ -725,9 +727,38 @@ static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct ms
> release_sock(sk);
>
> if (sock->type == SOCK_STREAM)
> - return bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
> + err = bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
> + else
> + err = bt_sock_recvmsg(iocb, sock, msg, len, flags);
> +
> + if (pi->chan->mode == L2CAP_MODE_ERTM) {

if (pi->chan->mode != L2CAP_MODE_ERTM) {
return err;

lock_sock()...

> + int threshold;
> +
> + /* Attempt to put pending rx data in the socket buffer */
> +
> + lock_sock(sk);
> + if (pi->rx_busy_skb) {
> + int queue_err;
> + queue_err = sock_queue_rcv_skb(sk, pi->rx_busy_skb);
> +
> + if (!queue_err)
> + pi->rx_busy_skb = NULL;

if (!sock_queue_rcv_skb())
pi->rx_busy_skb = NULL;
else
return err;

What I understood is that if sock_queue_rcv_skb() fails there is no need to
check if there is available space in the buffer, then we just return. The
locks also needs to be released.

> + }
> +
> + /* Restore data flow when half of the receive buffer is
> + * available. This avoids resending large numbers of
> + * frames.
> + */
> + threshold = sk->sk_rcvbuf >> 1;
> + if (test_bit(CONN_LOCAL_BUSY, &pi->chan->conn_state) &&
> + !pi->rx_busy_skb &&
> + atomic_read(&sk->sk_rmem_alloc) <= threshold)
> + l2cap_chan_busy(pi->chan, 0);

Then this if can be just

if (atomic_read(sk_rmem_alloc) <= sk_rcvbuf >> 1)
l2cap_chan_busy(pi->chan, 0);

> +
> + release_sock(sk);
> + }
>
> - return bt_sock_recvmsg(iocb, sock, msg, len, flags);
> + return err;
> }
>
> /* Kill socket (only if zapped and orphan)
> @@ -811,9 +842,31 @@ static struct l2cap_chan *l2cap_sock_new_connection_cb(void *data)
>
> static int l2cap_sock_recv_cb(void *data, struct sk_buff *skb)
> {
> + int err;
> struct sock *sk = data;
> + struct l2cap_pinfo *pi = l2cap_pi(sk);
> +
> + if (pi->rx_busy_skb)
> + return -ENOMEM;
>
> - return sock_queue_rcv_skb(sk, skb);
> + err = sock_queue_rcv_skb(sk, skb);
> +
> + /* For ERTM, handle one skb that doesn't fit into the recv
> + * buffer. This is important to do because the data frames
> + * have already been acked, so the skb cannot be discarded.
> + *
> + * Notify the l2cap core that the buffer is full, so the
> + * LOCAL_BUSY state is entered and no more frames are
> + * acked and reassembled until there is buffer space
> + * available.
> + */
> + if (err < 0 && pi->chan->mode == L2CAP_MODE_ERTM) {
> + pi->rx_busy_skb = skb;
> + l2cap_chan_busy(pi->chan, 1);
> + err = 0;
> + }
> +
> + return err;

You always return 0 here, remove err;

Gustavo

2011-07-06 21:54:32

by Mat Martineau

[permalink] [raw]
Subject: [PATCH v2 3/3] Bluetooth: Remove L2CAP busy queue

The ERTM receive buffer is now handled in a way that does not require
the busy queue and the associated polling code.

Signed-off-by: Mat Martineau <[email protected]>
---
include/net/bluetooth/l2cap.h | 4 -
net/bluetooth/l2cap_core.c | 125 +++--------------------------------------
2 files changed, 8 insertions(+), 121 deletions(-)

diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
index 66b8d96..578545a 100644
--- a/include/net/bluetooth/l2cap.h
+++ b/include/net/bluetooth/l2cap.h
@@ -37,7 +37,6 @@
#define L2CAP_DEFAULT_MONITOR_TO 12000 /* 12 seconds */
#define L2CAP_DEFAULT_MAX_PDU_SIZE 1009 /* Sized for 3-DH5 packet */
#define L2CAP_DEFAULT_ACK_TO 200
-#define L2CAP_LOCAL_BUSY_TRIES 12
#define L2CAP_LE_DEFAULT_MTU 23

#define L2CAP_CONN_TIMEOUT (40000) /* 40 seconds */
@@ -352,8 +351,6 @@ struct l2cap_chan {
struct sk_buff *tx_send_head;
struct sk_buff_head tx_q;
struct sk_buff_head srej_q;
- struct sk_buff_head busy_q;
- struct work_struct busy_work;
struct list_head srej_l;

struct list_head list;
@@ -450,7 +447,6 @@ enum {
CONN_REJ_ACT,
CONN_SEND_FBIT,
CONN_RNR_SENT,
- CONN_SAR_RETRY,
};

#define __set_chan_timer(c, t) l2cap_set_timer(c, &c->chan_timer, (t))
diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
index ea9c7d0..2c5d335 100644
--- a/net/bluetooth/l2cap_core.c
+++ b/net/bluetooth/l2cap_core.c
@@ -61,13 +61,9 @@ int disable_ertm;
static u32 l2cap_feat_mask = L2CAP_FEAT_FIXED_CHAN;
static u8 l2cap_fixed_chan[8] = { 0x02, };

-static struct workqueue_struct *_busy_wq;
-
static LIST_HEAD(chan_list);
static DEFINE_RWLOCK(chan_list_lock);

-static void l2cap_busy_work(struct work_struct *work);
-
static struct sk_buff *l2cap_build_cmd(struct l2cap_conn *conn,
u8 code, u8 ident, u16 dlen, void *data);
static void l2cap_send_cmd(struct l2cap_conn *conn, u8 ident, u8 code, u16 len,
@@ -395,7 +391,6 @@ static void l2cap_chan_del(struct l2cap_chan *chan, int err)
__clear_ack_timer(chan);

skb_queue_purge(&chan->srej_q);
- skb_queue_purge(&chan->busy_q);

list_for_each_entry_safe(l, tmp, &chan->srej_l, list) {
list_del(&l->list);
@@ -1873,11 +1868,9 @@ static inline void l2cap_ertm_init(struct l2cap_chan *chan)
setup_timer(&chan->ack_timer, l2cap_ack_timeout, (unsigned long) chan);

skb_queue_head_init(&chan->srej_q);
- skb_queue_head_init(&chan->busy_q);

INIT_LIST_HEAD(&chan->srej_l);

- INIT_WORK(&chan->busy_work, l2cap_busy_work);

sk->sk_backlog_rcv = l2cap_ertm_data_rcv;
}
@@ -3182,32 +3175,27 @@ static int l2cap_ertm_reassembly_sdu(struct l2cap_chan *chan, struct sk_buff *sk
if (!chan->sdu)
goto disconnect;

- if (!test_bit(CONN_SAR_RETRY, &chan->conn_state)) {
- chan->partial_sdu_len += skb->len;
+ chan->partial_sdu_len += skb->len;

- if (chan->partial_sdu_len > chan->imtu)
- goto drop;
+ if (chan->partial_sdu_len > chan->imtu)
+ goto drop;

- if (chan->partial_sdu_len != chan->sdu_len)
- goto drop;
+ if (chan->partial_sdu_len != chan->sdu_len)
+ goto drop;

- memcpy(skb_put(chan->sdu, skb->len), skb->data, skb->len);
- }
+ memcpy(skb_put(chan->sdu, skb->len), skb->data, skb->len);

_skb = skb_clone(chan->sdu, GFP_ATOMIC);
if (!_skb) {
- set_bit(CONN_SAR_RETRY, &chan->conn_state);
return -ENOMEM;
}

err = chan->ops->recv(chan->data, _skb);
if (err < 0) {
kfree_skb(_skb);
- set_bit(CONN_SAR_RETRY, &chan->conn_state);
return err;
}

- clear_bit(CONN_SAR_RETRY, &chan->conn_state);
clear_bit(CONN_SAR_SDU, &chan->conn_state);

kfree_skb(chan->sdu);
@@ -3268,93 +3256,6 @@ done:
BT_DBG("chan %p, Exit local busy", chan);
}

-static int l2cap_try_push_rx_skb(struct l2cap_chan *chan)
-{
- struct sk_buff *skb;
- u16 control;
- int err;
-
- while ((skb = skb_dequeue(&chan->busy_q))) {
- control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
- err = l2cap_ertm_reassembly_sdu(chan, skb, control);
- if (err < 0) {
- skb_queue_head(&chan->busy_q, skb);
- return -EBUSY;
- }
-
- chan->buffer_seq = (chan->buffer_seq + 1) % 64;
- }
-
- l2cap_ertm_exit_local_busy(chan);
-
- return 0;
-}
-
-static void l2cap_busy_work(struct work_struct *work)
-{
- DECLARE_WAITQUEUE(wait, current);
- struct l2cap_chan *chan =
- container_of(work, struct l2cap_chan, busy_work);
- struct sock *sk = chan->sk;
- int n_tries = 0, timeo = HZ/5, err;
- struct sk_buff *skb;
-
- lock_sock(sk);
-
- add_wait_queue(sk_sleep(sk), &wait);
- while ((skb = skb_peek(&chan->busy_q))) {
- set_current_state(TASK_INTERRUPTIBLE);
-
- if (n_tries++ > L2CAP_LOCAL_BUSY_TRIES) {
- err = -EBUSY;
- l2cap_send_disconn_req(chan->conn, chan, EBUSY);
- break;
- }
-
- if (!timeo)
- timeo = HZ/5;
-
- if (signal_pending(current)) {
- err = sock_intr_errno(timeo);
- break;
- }
-
- release_sock(sk);
- timeo = schedule_timeout(timeo);
- lock_sock(sk);
-
- err = sock_error(sk);
- if (err)
- break;
-
- if (l2cap_try_push_rx_skb(chan) == 0)
- break;
- }
-
- set_current_state(TASK_RUNNING);
- remove_wait_queue(sk_sleep(sk), &wait);
-
- release_sock(sk);
-}
-
-static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
-{
- int err;
-
- if (test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
- bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
- __skb_queue_tail(&chan->busy_q, skb);
- return l2cap_try_push_rx_skb(chan);
-
-
- }
-
- err = l2cap_ertm_reassembly_sdu(chan, skb, control);
- chan->buffer_seq = (chan->buffer_seq + 1) % 64;
-
- return err;
-}
-
void l2cap_chan_busy(struct l2cap_chan *chan, int busy)
{
if (chan->mode == L2CAP_MODE_ERTM) {
@@ -3612,7 +3513,6 @@ static inline int l2cap_data_channel_iframe(struct l2cap_chan *chan, u16 rx_cont
chan->buffer_seq_srej = chan->buffer_seq;

__skb_queue_head_init(&chan->srej_q);
- __skb_queue_head_init(&chan->busy_q);
l2cap_add_to_srej_queue(chan, skb, tx_seq, sar);

set_bit(CONN_SEND_PBIT, &chan->conn_state);
@@ -3633,7 +3533,8 @@ expected:
return 0;
}

- err = l2cap_push_rx_skb(chan, skb, rx_control);
+ err = l2cap_ertm_reassembly_sdu(chan, skb, rx_control);
+ chan->buffer_seq = (chan->buffer_seq + 1) % 64;
if (err < 0) {
l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
return err;
@@ -4439,12 +4340,6 @@ int __init l2cap_init(void)
if (err < 0)
return err;

- _busy_wq = create_singlethread_workqueue("l2cap");
- if (!_busy_wq) {
- err = -ENOMEM;
- goto error;
- }
-
err = hci_register_proto(&l2cap_hci_proto);
if (err < 0) {
BT_ERR("L2CAP protocol registration failed");
@@ -4462,7 +4357,6 @@ int __init l2cap_init(void)
return 0;

error:
- destroy_workqueue(_busy_wq);
l2cap_cleanup_sockets();
return err;
}
@@ -4471,9 +4365,6 @@ void l2cap_exit(void)
{
debugfs_remove(l2cap_debugfs);

- flush_workqueue(_busy_wq);
- destroy_workqueue(_busy_wq);
-
if (hci_unregister_proto(&l2cap_hci_proto) < 0)
BT_ERR("L2CAP protocol unregistration failed");

--
1.7.6

--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum


2011-07-06 21:54:30

by Mat Martineau

[permalink] [raw]
Subject: [PATCH v2 1/3] Bluetooth: Move code for ERTM local busy state to separate functions

The local busy state is entered and exited based on buffer status in
the socket layer (or other upper layer). This change is in
preparation for general buffer status reports from the socket layer,
which will then be used to change the local busy status.

Signed-off-by: Mat Martineau <[email protected]>
---
net/bluetooth/l2cap_core.c | 62 +++++++++++++++++++++++++++-----------------
1 files changed, 38 insertions(+), 24 deletions(-)

diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
index bd5d992..f7ada4a 100644
--- a/net/bluetooth/l2cap_core.c
+++ b/net/bluetooth/l2cap_core.c
@@ -3227,22 +3227,26 @@ disconnect:
return 0;
}

-static int l2cap_try_push_rx_skb(struct l2cap_chan *chan)
+static void l2cap_ertm_enter_local_busy(struct l2cap_chan *chan)
{
- struct sk_buff *skb;
u16 control;
- int err;

- while ((skb = skb_dequeue(&chan->busy_q))) {
- control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
- err = l2cap_ertm_reassembly_sdu(chan, skb, control);
- if (err < 0) {
- skb_queue_head(&chan->busy_q, skb);
- return -EBUSY;
- }
+ BT_DBG("chan %p, Enter local busy", chan);

- chan->buffer_seq = (chan->buffer_seq + 1) % 64;
- }
+ set_bit(CONN_LOCAL_BUSY, &chan->conn_state);
+
+ control = chan->buffer_seq << L2CAP_CTRL_REQSEQ_SHIFT;
+ control |= L2CAP_SUPER_RCV_NOT_READY;
+ l2cap_send_sframe(chan, control);
+
+ set_bit(CONN_RNR_SENT, &chan->conn_state);
+
+ __clear_ack_timer(chan);
+}
+
+static void l2cap_ertm_exit_local_busy(struct l2cap_chan *chan)
+{
+ u16 control;

if (!test_bit(CONN_RNR_SENT, &chan->conn_state))
goto done;
@@ -3262,6 +3266,26 @@ done:
clear_bit(CONN_RNR_SENT, &chan->conn_state);

BT_DBG("chan %p, Exit local busy", chan);
+}
+
+static int l2cap_try_push_rx_skb(struct l2cap_chan *chan)
+{
+ struct sk_buff *skb;
+ u16 control;
+ int err;
+
+ while ((skb = skb_dequeue(&chan->busy_q))) {
+ control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
+ err = l2cap_ertm_reassembly_sdu(chan, skb, control);
+ if (err < 0) {
+ skb_queue_head(&chan->busy_q, skb);
+ return -EBUSY;
+ }
+
+ chan->buffer_seq = (chan->buffer_seq + 1) % 64;
+ }
+
+ l2cap_ertm_exit_local_busy(chan);

return 0;
}
@@ -3315,7 +3339,7 @@ static void l2cap_busy_work(struct work_struct *work)

static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
{
- int sctrl, err;
+ int err;

if (test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
@@ -3331,21 +3355,11 @@ static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 c
return err;
}

- /* Busy Condition */
- BT_DBG("chan %p, Enter local busy", chan);
+ l2cap_ertm_enter_local_busy(chan);

- set_bit(CONN_LOCAL_BUSY, &chan->conn_state);
bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
__skb_queue_tail(&chan->busy_q, skb);

- sctrl = chan->buffer_seq << L2CAP_CTRL_REQSEQ_SHIFT;
- sctrl |= L2CAP_SUPER_RCV_NOT_READY;
- l2cap_send_sframe(chan, sctrl);
-
- set_bit(CONN_RNR_SENT, &chan->conn_state);
-
- __clear_ack_timer(chan);
-
queue_work(_busy_wq, &chan->busy_work);

return err;
--
1.7.6

--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum


2011-07-06 21:54:31

by Mat Martineau

[permalink] [raw]
Subject: [PATCH v2 2/3] Bluetooth: Use event-driven approach for handling ERTM receive buffer

This change moves most L2CAP ERTM receive buffer handling out of the
L2CAP core and in to the socket code. It's up to the higher layer
(the socket code, in this case) to tell the core when its buffer is
full or has space available. The recv op should always accept
incoming ERTM data or else the connection will go down.

Within the socket layer, an skb that does not fit in the socket
receive buffer will be temporarily stored. When the socket is read
from, that skb will be placed in the receive buffer if possible. Once
adequate buffer space becomes available, the L2CAP core is informed
and the ERTM local busy state is cleared.

Receive buffer management for non-ERTM modes is unchanged.

Signed-off-by: Mat Martineau <[email protected]>
---
include/net/bluetooth/l2cap.h | 2 +
net/bluetooth/l2cap_core.c | 41 ++++++++++++++++---------
net/bluetooth/l2cap_sock.c | 66 ++++++++++++++++++++++++++++++++++++++--
3 files changed, 90 insertions(+), 19 deletions(-)

diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
index 9c18e55..66b8d96 100644
--- a/include/net/bluetooth/l2cap.h
+++ b/include/net/bluetooth/l2cap.h
@@ -422,6 +422,7 @@ struct l2cap_conn {
struct l2cap_pinfo {
struct bt_sock bt;
struct l2cap_chan *chan;
+ struct sk_buff *rx_busy_skb;
};

enum {
@@ -498,5 +499,6 @@ void l2cap_chan_close(struct l2cap_chan *chan, int reason);
void l2cap_chan_destroy(struct l2cap_chan *chan);
int l2cap_chan_connect(struct l2cap_chan *chan);
int l2cap_chan_send(struct l2cap_chan *chan, struct msghdr *msg, size_t len);
+void l2cap_chan_busy(struct l2cap_chan *chan, int busy);

#endif /* __L2CAP_H */
diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
index f7ada4a..ea9c7d0 100644
--- a/net/bluetooth/l2cap_core.c
+++ b/net/bluetooth/l2cap_core.c
@@ -3350,21 +3350,21 @@ static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 c
}

err = l2cap_ertm_reassembly_sdu(chan, skb, control);
- if (err >= 0) {
- chan->buffer_seq = (chan->buffer_seq + 1) % 64;
- return err;
- }
-
- l2cap_ertm_enter_local_busy(chan);
-
- bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
- __skb_queue_tail(&chan->busy_q, skb);
-
- queue_work(_busy_wq, &chan->busy_work);
+ chan->buffer_seq = (chan->buffer_seq + 1) % 64;

return err;
}

+void l2cap_chan_busy(struct l2cap_chan *chan, int busy)
+{
+ if (chan->mode == L2CAP_MODE_ERTM) {
+ if (busy)
+ l2cap_ertm_enter_local_busy(chan);
+ else
+ l2cap_ertm_exit_local_busy(chan);
+ }
+}
+
static int l2cap_streaming_reassembly_sdu(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
{
struct sk_buff *_skb;
@@ -3463,13 +3463,22 @@ static void l2cap_check_srej_gap(struct l2cap_chan *chan, u8 tx_seq)
struct sk_buff *skb;
u16 control;

- while ((skb = skb_peek(&chan->srej_q))) {
+ while ((skb = skb_peek(&chan->srej_q)) &&
+ !test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
+ int err;
+
if (bt_cb(skb)->tx_seq != tx_seq)
break;

skb = skb_dequeue(&chan->srej_q);
control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
- l2cap_ertm_reassembly_sdu(chan, skb, control);
+ err = l2cap_ertm_reassembly_sdu(chan, skb, control);
+
+ if (err < 0) {
+ l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
+ break;
+ }
+
chan->buffer_seq_srej =
(chan->buffer_seq_srej + 1) % 64;
tx_seq = (tx_seq + 1) % 64;
@@ -3625,8 +3634,10 @@ expected:
}

err = l2cap_push_rx_skb(chan, skb, rx_control);
- if (err < 0)
- return 0;
+ if (err < 0) {
+ l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
+ return err;
+ }

if (rx_control & L2CAP_CTRL_FINAL) {
if (!test_and_clear_bit(CONN_REJ_ACT, &chan->conn_state))
diff --git a/net/bluetooth/l2cap_sock.c b/net/bluetooth/l2cap_sock.c
index 39082d4..9bf7313 100644
--- a/net/bluetooth/l2cap_sock.c
+++ b/net/bluetooth/l2cap_sock.c
@@ -711,13 +711,15 @@ static int l2cap_sock_sendmsg(struct kiocb *iocb, struct socket *sock, struct ms
static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t len, int flags)
{
struct sock *sk = sock->sk;
+ struct l2cap_pinfo *pi = l2cap_pi(sk);
+ int err;

lock_sock(sk);

if (sk->sk_state == BT_CONNECT2 && bt_sk(sk)->defer_setup) {
sk->sk_state = BT_CONFIG;

- __l2cap_connect_rsp_defer(l2cap_pi(sk)->chan);
+ __l2cap_connect_rsp_defer(pi->chan);
release_sock(sk);
return 0;
}
@@ -725,9 +727,38 @@ static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct ms
release_sock(sk);

if (sock->type == SOCK_STREAM)
- return bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
+ err = bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
+ else
+ err = bt_sock_recvmsg(iocb, sock, msg, len, flags);
+
+ if (pi->chan->mode == L2CAP_MODE_ERTM) {
+ int threshold;
+
+ /* Attempt to put pending rx data in the socket buffer */
+
+ lock_sock(sk);
+ if (pi->rx_busy_skb) {
+ int queue_err;
+ queue_err = sock_queue_rcv_skb(sk, pi->rx_busy_skb);
+
+ if (!queue_err)
+ pi->rx_busy_skb = NULL;
+ }
+
+ /* Restore data flow when half of the receive buffer is
+ * available. This avoids resending large numbers of
+ * frames.
+ */
+ threshold = sk->sk_rcvbuf >> 1;
+ if (test_bit(CONN_LOCAL_BUSY, &pi->chan->conn_state) &&
+ !pi->rx_busy_skb &&
+ atomic_read(&sk->sk_rmem_alloc) <= threshold)
+ l2cap_chan_busy(pi->chan, 0);
+
+ release_sock(sk);
+ }

- return bt_sock_recvmsg(iocb, sock, msg, len, flags);
+ return err;
}

/* Kill socket (only if zapped and orphan)
@@ -811,9 +842,31 @@ static struct l2cap_chan *l2cap_sock_new_connection_cb(void *data)

static int l2cap_sock_recv_cb(void *data, struct sk_buff *skb)
{
+ int err;
struct sock *sk = data;
+ struct l2cap_pinfo *pi = l2cap_pi(sk);
+
+ if (pi->rx_busy_skb)
+ return -ENOMEM;

- return sock_queue_rcv_skb(sk, skb);
+ err = sock_queue_rcv_skb(sk, skb);
+
+ /* For ERTM, handle one skb that doesn't fit into the recv
+ * buffer. This is important to do because the data frames
+ * have already been acked, so the skb cannot be discarded.
+ *
+ * Notify the l2cap core that the buffer is full, so the
+ * LOCAL_BUSY state is entered and no more frames are
+ * acked and reassembled until there is buffer space
+ * available.
+ */
+ if (err < 0 && pi->chan->mode == L2CAP_MODE_ERTM) {
+ pi->rx_busy_skb = skb;
+ l2cap_chan_busy(pi->chan, 1);
+ err = 0;
+ }
+
+ return err;
}

static void l2cap_sock_close_cb(void *data)
@@ -842,6 +895,11 @@ static void l2cap_sock_destruct(struct sock *sk)
{
BT_DBG("sk %p", sk);

+ if (l2cap_pi(sk)->rx_busy_skb) {
+ kfree_skb(l2cap_pi(sk)->rx_busy_skb);
+ l2cap_pi(sk)->rx_busy_skb = NULL;
+ }
+
skb_queue_purge(&sk->sk_receive_queue);
skb_queue_purge(&sk->sk_write_queue);
}
--
1.7.6

--
Mat Martineau
Employee of Qualcomm Innovation Center, Inc.
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum