2010-11-04 08:55:30

by Radoslaw Jablonski

[permalink] [raw]
Subject: [PATCH v3] Add support for sending small data through obex

Added handling packets smaller than mtu in obex_write_stream. Now trying
to read from source until mtu will be filled properly and not sending
immediately data if it is smaller than mtu.
---
src/obex.c | 51 +++++++++++++++++++++++++++++++++++++++------------
1 files changed, 39 insertions(+), 12 deletions(-)

diff --git a/src/obex.c b/src/obex.c
index 6d4430d..4cbc1f8 100644
--- a/src/obex.c
+++ b/src/obex.c
@@ -622,7 +622,7 @@ static int obex_write_stream(struct obex_session *os,
{
obex_headerdata_t hd;
uint8_t *ptr;
- ssize_t len;
+ ssize_t len, r_len;
unsigned int flags;
uint8_t hi;

@@ -642,18 +642,39 @@ static int obex_write_stream(struct obex_session *os,
goto add_header;
}

- len = os->driver->read(os->object, os->buf, os->tx_mtu, &hi);
- if (len < 0) {
- error("read(): %s (%zd)", strerror(-len), -len);
- if (len == -EAGAIN)
- return len;
- else if (len == -ENOSTR)
- return 0;
+ /* Copying data from source until we reach end of the stream. Sending
+ * data only if MTU will be filled in 100% or we reach end of data.
+ * Remaining data in buffer will be sent with next amount of data
+ * from source.*/
+ do {
+ r_len = os->driver->read(os->object, os->buf + os->pending,
+ os->tx_mtu - os->pending, &hi);

- g_free(os->buf);
- os->buf = NULL;
- return len;
- }
+ if (r_len == 0)
+ break;
+ else if (r_len < 0) {
+ error("read(): %s (%zd)", strerror(-r_len), -r_len);
+
+ switch (r_len) {
+ case -EAGAIN:
+ return r_len;
+ case -EINTR:
+ continue;
+ case -ENOSTR:
+ return 0;
+ default:
+ g_free(os->buf);
+ os->buf = NULL;
+ return r_len;
+ }
+ }
+
+ /* Saving amount of data accumulated in obex buffer */
+ os->pending += r_len;
+ } while (os->pending < os->tx_mtu);
+
+ len = os->pending;
+ os->pending = 0;

ptr = os->buf;

@@ -702,6 +723,12 @@ static gboolean handle_async_io(void *object, int flags, int err,
ret = obex_read_stream(os, os->obex, os->obj);

proceed:
+
+ /* Returning TRUE to not delete current watcher - it need to be active
+ * to handle next io flag changes (more data will be available later)*/
+ if (ret == -EAGAIN)
+ return TRUE;
+
if (ret < 0) {
os_set_response(os->obj, err);
OBEX_CancelRequest(os->obex, TRUE);
--
1.7.0.4



2010-11-04 12:20:53

by Luiz Augusto von Dentz

[permalink] [raw]
Subject: Re: [PATCH v2] Add support for sending large PBAP response in many parts

Hi,

On Thu, Nov 4, 2010 at 10:55 AM, Radoslaw Jablonski
<[email protected]> wrote:
> Added file buffer to cache pull results - temporary file will be deleted
> when response is sent. Also added partial_resp variable to pbap_session
> for holding information if more data will be available from source later.
> It was needed to know when sent -EAGAIN to obex, if currently is no data
> available in the buffer.
> ---
> ?plugins/pbap.c | ? 82 +++++++++++++++++++++++++++++++++++++++++++++++++-------
> ?1 files changed, 72 insertions(+), 10 deletions(-)
>
> diff --git a/plugins/pbap.c b/plugins/pbap.c
> index 3ea7d6b..e59ce8d 100644
> --- a/plugins/pbap.c
> +++ b/plugins/pbap.c
> @@ -116,6 +116,8 @@
> ? </attribute> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? \
> ?</record>"
>
> +#define PBAP_BUF_TEMPLATE "pbap_pullXXXXXX"
> +
> ?struct aparam_header {
> ? ? ? ?uint8_t tag;
> ? ? ? ?uint8_t len;
> @@ -143,6 +145,10 @@ struct pbap_session {
> ? ? ? ?uint32_t find_handle;
> ? ? ? ?GString *buffer;
> ? ? ? ?struct cache cache;
> + ? ? ? gboolean partial_resp;
> + ? ? ? int fbuf_w;
> + ? ? ? int fbuf_r;
> + ? ? ? char *buf_path;
> ?};
>
> ?static const uint8_t PBAP_TARGET[TARGET_SIZE] = {
> @@ -256,13 +262,28 @@ static void query_result(const char *buffer, size_t bufsize, int vcards,
> ? ? ? ? ? ? ? ?return;
> ? ? ? ?}
>
> - ? ? ? if (!pbap->buffer)
> - ? ? ? ? ? ? ? pbap->buffer = g_string_new_len(buffer, bufsize);
> - ? ? ? else
> - ? ? ? ? ? ? ? pbap->buffer = g_string_append_len(pbap->buffer, buffer,
> - ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? bufsize);
> + ? ? ? if (!pbap->fbuf_w) {
> + ? ? ? ? ? ? ? /* Creating file buffer for results*/
> + ? ? ? ? ? ? ? pbap->buf_path = g_build_filename(g_get_tmp_dir(),
> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? PBAP_BUF_TEMPLATE, NULL);
> + ? ? ? ? ? ? ? pbap->fbuf_w = g_mkstemp(pbap->buf_path);
> +
> + ? ? ? ? ? ? ? if (pbap->fbuf_w < 0)
> + ? ? ? ? ? ? ? ? ? ? ? return -EPERM;
> + ? ? ? }

When checking if fd is valid doing < 0

> +
> + ? ? ? write(pbap->fbuf_w, buffer, bufsize);
> +
> + ? ? ? /* If partial_resp will be set to TRUE, then we won't end transmission
> + ? ? ? ?* after sending one part of results to the client via obex*/
> + ? ? ? pbap->partial_resp = missed ? TRUE : FALSE;
> +
> + ? ? ? /* If no more data in future, we close file buffer right now*/
> + ? ? ? if(!pbap->partial_resp)
> + ? ? ? ? ? ? ? close(pbap->fbuf_w);

When closing the fd set it to -1.

> ? ? ? ?obex_object_set_io_flags(pbap, G_IO_IN, 0);
> + ? ? ? DBG("Query result end...");
> ?}
>
> ?static void cache_entry_notify(const char *id, uint32_t handle,
> @@ -829,6 +850,27 @@ fail:
> ? ? ? ?return NULL;
> ?}
>
> +static ssize_t pbap_read_fbuf(struct pbap_session *pbap, void *buf,
> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? size_t count)
> +{
> + ? ? ? ssize_t len;
> +
> + ? ? ? if (!pbap->fbuf_r) {
> + ? ? ? ? ? ? ? pbap->fbuf_r = open(pbap->buf_path, 0);
> +
> + ? ? ? ? ? ? ? if(pbap->fbuf_r < 0)
> + ? ? ? ? ? ? ? ? ? ? ? return -EPERM;
> + ? ? ? }

Again use negative values to check if fd was already opened.

> + ? ? ? len = read(pbap->fbuf_r, buf, count);
> +
> + ? ? ? if (len == 0 && pbap->partial_resp)
> + ? ? ? ? ? ? ? /* More data available later */
> + ? ? ? ? ? ? ? return -EAGAIN;
> + ? ? ? else
> + ? ? ? ? ? ? ? return len;
> +}
> +
> ?static ssize_t vobject_pull_read(void *object, void *buf, size_t count,
> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?uint8_t *hi)
> ?{
> @@ -837,17 +879,23 @@ static ssize_t vobject_pull_read(void *object, void *buf, size_t count,
> ? ? ? ?DBG("buffer %p maxlistcount %d", pbap->buffer,
> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?pbap->params->maxlistcount);
>
> - ? ? ? if (!pbap->buffer)
> + ? ? ? if (!pbap->fbuf_w && !pbap->buffer)
> + ? ? ? ? ? ? ? /* No response in buffers now */
> ? ? ? ? ? ? ? ?return -EAGAIN;

Again use negative values.

> - ? ? ? /* PhoneBookSize */
> - ? ? ? if (pbap->params->maxlistcount == 0)
> + ? ? ? /* Result from pb size query is very short (only number) so it makes no
> + ? ? ? ?* sense to create file buffer for it - using memory buff */
> + ? ? ? if (pbap->params->maxlistcount == 0) {
> + ? ? ? ? ? ? ? /* PhoneBookSize */
> ? ? ? ? ? ? ? ?*hi = OBEX_HDR_APPARAM;
> - ? ? ? else
> +
> + ? ? ? ? ? ? ? return string_read(pbap->buffer, buf, count);
> + ? ? ? } else {
> ? ? ? ? ? ? ? ?/* Stream data */
> ? ? ? ? ? ? ? ?*hi = OBEX_HDR_BODY;
>
> - ? ? ? return string_read(pbap->buffer, buf, count);
> + ? ? ? ? ? ? ? return pbap_read_fbuf(pbap, buf, count);
> + ? ? ? }
> ?}
>
> ?static ssize_t vobject_list_read(void *object, void *buf, size_t count,
> @@ -893,6 +941,20 @@ static int vobject_close(void *object)
> ? ? ? ? ? ? ? ?pbap->buffer = NULL;
> ? ? ? ?}
>
> + ? ? ? if (pbap->fbuf_r)
> + ? ? ? ? ? ? ? close(pbap->fbuf_r);
> +
> + ? ? ? if (pbap->fbuf_w)
> + ? ? ? ? ? ? ? close(pbap->fbuf_w);

When you close a fd reset it to -1.

> + ? ? ? if (pbap->buf_path) {
> + ? ? ? ? ? ? ? /* remove file buffer for pull queries */
> + ? ? ? ? ? ? ? unlink(pbap->buf_path);
> +
> + ? ? ? ? ? ? ? g_free(pbap->buf_path);
> + ? ? ? ? ? ? ? pbap->buf_path = NULL;
> + ? ? ? }
> +
> ? ? ? ?return 0;
> ?}


--
Luiz Augusto von Dentz
Computer Engineer

2010-11-04 08:55:31

by Radoslaw Jablonski

[permalink] [raw]
Subject: [PATCH v2] Add support for sending large PBAP response in many parts

Added file buffer to cache pull results - temporary file will be deleted
when response is sent. Also added partial_resp variable to pbap_session
for holding information if more data will be available from source later.
It was needed to know when sent -EAGAIN to obex, if currently is no data
available in the buffer.
---
plugins/pbap.c | 82 +++++++++++++++++++++++++++++++++++++++++++++++++-------
1 files changed, 72 insertions(+), 10 deletions(-)

diff --git a/plugins/pbap.c b/plugins/pbap.c
index 3ea7d6b..e59ce8d 100644
--- a/plugins/pbap.c
+++ b/plugins/pbap.c
@@ -116,6 +116,8 @@
</attribute> \
</record>"

+#define PBAP_BUF_TEMPLATE "pbap_pullXXXXXX"
+
struct aparam_header {
uint8_t tag;
uint8_t len;
@@ -143,6 +145,10 @@ struct pbap_session {
uint32_t find_handle;
GString *buffer;
struct cache cache;
+ gboolean partial_resp;
+ int fbuf_w;
+ int fbuf_r;
+ char *buf_path;
};

static const uint8_t PBAP_TARGET[TARGET_SIZE] = {
@@ -256,13 +262,28 @@ static void query_result(const char *buffer, size_t bufsize, int vcards,
return;
}

- if (!pbap->buffer)
- pbap->buffer = g_string_new_len(buffer, bufsize);
- else
- pbap->buffer = g_string_append_len(pbap->buffer, buffer,
- bufsize);
+ if (!pbap->fbuf_w) {
+ /* Creating file buffer for results*/
+ pbap->buf_path = g_build_filename(g_get_tmp_dir(),
+ PBAP_BUF_TEMPLATE, NULL);
+ pbap->fbuf_w = g_mkstemp(pbap->buf_path);
+
+ if (pbap->fbuf_w < 0)
+ return -EPERM;
+ }
+
+ write(pbap->fbuf_w, buffer, bufsize);
+
+ /* If partial_resp will be set to TRUE, then we won't end transmission
+ * after sending one part of results to the client via obex*/
+ pbap->partial_resp = missed ? TRUE : FALSE;
+
+ /* If no more data in future, we close file buffer right now*/
+ if(!pbap->partial_resp)
+ close(pbap->fbuf_w);

obex_object_set_io_flags(pbap, G_IO_IN, 0);
+ DBG("Query result end...");
}

static void cache_entry_notify(const char *id, uint32_t handle,
@@ -829,6 +850,27 @@ fail:
return NULL;
}

+static ssize_t pbap_read_fbuf(struct pbap_session *pbap, void *buf,
+ size_t count)
+{
+ ssize_t len;
+
+ if (!pbap->fbuf_r) {
+ pbap->fbuf_r = open(pbap->buf_path, 0);
+
+ if(pbap->fbuf_r < 0)
+ return -EPERM;
+ }
+
+ len = read(pbap->fbuf_r, buf, count);
+
+ if (len == 0 && pbap->partial_resp)
+ /* More data available later */
+ return -EAGAIN;
+ else
+ return len;
+}
+
static ssize_t vobject_pull_read(void *object, void *buf, size_t count,
uint8_t *hi)
{
@@ -837,17 +879,23 @@ static ssize_t vobject_pull_read(void *object, void *buf, size_t count,
DBG("buffer %p maxlistcount %d", pbap->buffer,
pbap->params->maxlistcount);

- if (!pbap->buffer)
+ if (!pbap->fbuf_w && !pbap->buffer)
+ /* No response in buffers now */
return -EAGAIN;

- /* PhoneBookSize */
- if (pbap->params->maxlistcount == 0)
+ /* Result from pb size query is very short (only number) so it makes no
+ * sense to create file buffer for it - using memory buff */
+ if (pbap->params->maxlistcount == 0) {
+ /* PhoneBookSize */
*hi = OBEX_HDR_APPARAM;
- else
+
+ return string_read(pbap->buffer, buf, count);
+ } else {
/* Stream data */
*hi = OBEX_HDR_BODY;

- return string_read(pbap->buffer, buf, count);
+ return pbap_read_fbuf(pbap, buf, count);
+ }
}

static ssize_t vobject_list_read(void *object, void *buf, size_t count,
@@ -893,6 +941,20 @@ static int vobject_close(void *object)
pbap->buffer = NULL;
}

+ if (pbap->fbuf_r)
+ close(pbap->fbuf_r);
+
+ if (pbap->fbuf_w)
+ close(pbap->fbuf_w);
+
+ if (pbap->buf_path) {
+ /* remove file buffer for pull queries */
+ unlink(pbap->buf_path);
+
+ g_free(pbap->buf_path);
+ pbap->buf_path = NULL;
+ }
+
return 0;
}

--
1.7.0.4


2010-11-04 08:55:32

by Radoslaw Jablonski

[permalink] [raw]
Subject: [PATCH v2] Add support for generating pull response in many parts

Now data from tracker is fetched in many smaller parts (instead of one
big query before). This is needed to save memory and to not overload
dbus and tracker when generating query result.
---
plugins/phonebook-tracker.c | 71 ++++++++++++++++++++++++++++++++++++-------
1 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/plugins/phonebook-tracker.c b/plugins/phonebook-tracker.c
index 58f52ab..46ef5fb 100644
--- a/plugins/phonebook-tracker.c
+++ b/plugins/phonebook-tracker.c
@@ -57,6 +57,8 @@
#define COL_ANSWERED 37
#define ADDR_FIELD_AMOUNT 7
#define CONTACT_ID_PREFIX "contact:"
+#define QUERY_LIMIT_FORMAT "%s LIMIT %d OFFSET %d"
+#define QUERY_LIMIT 50

#define CONTACTS_QUERY_ALL \
"SELECT ?v nco:fullname(?c) " \
@@ -650,6 +652,9 @@ struct phonebook_data {
gboolean vcardentry;
const struct apparam_field *params;
GSList *contacts;
+ char *name;
+ int offset;
+ int num_row;
};

struct cache_data {
@@ -1098,6 +1103,12 @@ static void add_affiliation(char **field, const char *value)
*field = g_strdup(value);
}

+static char *gen_partial_query(const char *name, int limit, int offset)
+{
+ return g_strdup_printf(QUERY_LIMIT_FORMAT, name2query(name),
+ limit, offset);
+}
+
static void pull_contacts(char **reply, int num_fields, void *user_data)
{
struct phonebook_data *data = user_data;
@@ -1107,13 +1118,17 @@ static void pull_contacts(char **reply, int num_fields, void *user_data)
GString *vcards;
int last_index, i;
gboolean cdata_present = FALSE;
- char *home_addr, *work_addr;
+ gboolean last_resp = FALSE;
+ char *home_addr, *work_addr, *query;

if (num_fields < 0) {
data->cb(NULL, 0, num_fields, 0, data->user_data);
goto fail;
}

+ data->num_row++;
+ last_index = params->liststartoffset + params->maxlistcount;
+
DBG("reply %p", reply);

if (reply == NULL)
@@ -1145,8 +1160,6 @@ static void pull_contacts(char **reply, int num_fields, void *user_data)

data->index++;

- last_index = params->liststartoffset + params->maxlistcount;
-
if ((data->index <= params->liststartoffset ||
data->index > last_index) &&
params->maxlistcount > 0)
@@ -1222,14 +1235,46 @@ add_numbers:
done:
vcards = gen_vcards(data->contacts, params);

- if (num_fields == 0)
+ /* If tracker returned only empty row - all results already returned */
+ if (num_fields == 0 && data->num_row == 1)
+ last_resp = TRUE;
+
+ /* Check if tracker could return desired number of results - if coldn't,
+ * all results are fetched already and this is last response */
+ if (data->num_row < QUERY_LIMIT)
+ last_resp = TRUE;
+
+ /* Check needed for 'maxlistcount' and 'liststartoffset' parameters */
+ if (data->index > last_index)
+ last_resp = TRUE;
+
+ /* Data won't be send if starting offset has not been achieved (unless
+ * now handling last response from tracker) */
+ if (data->index > params->liststartoffset || last_resp)
+ /* 4th parameter of callback is used to mark if stream should be
+ * closed or more data will be sent*/
data->cb(vcards->str, vcards->len,
- g_slist_length(data->contacts), 0,
- data->user_data);
+ g_slist_length(data->contacts), !last_resp,
+ data->user_data);

+ g_slist_free(data->contacts);data->contacts = NULL;
g_string_free(vcards, TRUE);
+ data->num_row = 0;
+
+ /* Sending query to tracker to get next part of results (only for pull
+ * phonebook queries) */
+ if (!data->vcardentry && !last_resp) {
+ data->offset += QUERY_LIMIT;
+ query = gen_partial_query(data->name, QUERY_LIMIT,
+ data->offset);
+ query_tracker(query, PULL_QUERY_COL_AMOUNT,
+ pull_contacts, data);
+ g_free(query);
+ return;
+ }
fail:
g_slist_free(data->contacts);
+ g_free(data->name);
g_free(data);
}

@@ -1367,18 +1412,18 @@ int phonebook_pull(const char *name, const struct apparam_field *params,
phonebook_cb cb, void *user_data)
{
struct phonebook_data *data;
- const char *query;
+ char *query;
reply_list_foreach_t pull_cb;
- int col_amount;
+ int col_amount, ret;

DBG("name %s", name);

if (params->maxlistcount == 0) {
- query = name2count_query(name);
+ query = g_strdup(name2count_query(name));
col_amount = COUNT_QUERY_COL_AMOUNT;
pull_cb = pull_contacts_size;
} else {
- query = name2query(name);
+ query = gen_partial_query(name, QUERY_LIMIT, 0);
col_amount = PULL_QUERY_COL_AMOUNT;
pull_cb = pull_contacts;
}
@@ -1390,8 +1435,12 @@ int phonebook_pull(const char *name, const struct apparam_field *params,
data->params = params;
data->user_data = user_data;
data->cb = cb;
+ data->name = g_strdup(name);
+
+ ret = query_tracker(query, col_amount, pull_cb, data);
+ g_free(query);

- return query_tracker(query, col_amount, pull_cb, data);
+ return ret;
}

int phonebook_get_entry(const char *folder, const char *id,
--
1.7.0.4