Commit cb607cd7 authored by brendan's avatar brendan
Browse files

Add shout_queuelen to expose the current length in bytes of the write queue.

To do this without having to recalculate, I had to create a queue structure to hold
the linked list itself and its length. Might be worth putting a pointer to the tail
here too.

svn path=/icecast/trunk/libshout/; revision=8144
parent 306b6ea6
......@@ -2,9 +2,13 @@
AUTOMAKE_OPTIONS = foreign
noinst_PROGRAMS = example
noinst_PROGRAMS = example nonblocking
example_SOURCES = example.c
example_LDADD = $(top_builddir)/src/libshout.la
nonblocking_SOURCES = nonblocking.c
nonblocking_LDADD = $(top_builddir)/src/libshout.la
AM_CFLAGS = @XIPH_CFLAGS@
AM_CPPFLAGS = @XIPH_CPPFLAGS@ -I$(top_builddir)/include
/* example.c: Demonstration of the libshout API. */
/* example.c: Demonstration of the libshout API.
* $Id$
*/
#include <stdio.h>
#include <stdlib.h>
......
/* -*- c-basic-offset: 8; -*-
* example.c: Demonstration of the libshout API.
* $Id$
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <shout/shout.h>
int main()
{
shout_t *shout;
char buff[4096];
long read, ret, total;
shout_init();
if (!(shout = shout_new())) {
printf("Could not allocate shout_t\n");
return 1;
}
if (shout_set_host(shout, "127.0.0.1") != SHOUTERR_SUCCESS) {
printf("Error setting hostname: %s\n", shout_get_error(shout));
return 1;
}
if (shout_set_protocol(shout, SHOUT_PROTOCOL_HTTP) != SHOUTERR_SUCCESS) {
printf("Error setting protocol: %s\n", shout_get_error(shout));
return 1;
}
if (shout_set_port(shout, 8000) != SHOUTERR_SUCCESS) {
printf("Error setting port: %s\n", shout_get_error(shout));
return 1;
}
if (shout_set_password(shout, "hackme") != SHOUTERR_SUCCESS) {
printf("Error setting password: %s\n", shout_get_error(shout));
return 1;
}
if (shout_set_mount(shout, "/example.ogg") != SHOUTERR_SUCCESS) {
printf("Error setting mount: %s\n", shout_get_error(shout));
return 1;
}
if (shout_set_user(shout, "source") != SHOUTERR_SUCCESS) {
printf("Error setting user: %s\n", shout_get_error(shout));
return 1;
}
if (shout_set_format(shout, SHOUT_FORMAT_OGG) != SHOUTERR_SUCCESS) {
printf("Error setting user: %s\n", shout_get_error(shout));
return 1;
}
if (shout_set_nonblocking(shout, 1) != SHOUTERR_SUCCESS) {
printf("Error setting non-blocking mode: %s\n", shout_get_error(shout));
return 1;
}
ret = shout_open(shout);
if (ret == SHOUTERR_SUCCESS)
ret = SHOUTERR_CONNECTED;
while (ret == SHOUTERR_BUSY) {
printf("Connection pending. Sleeping...\n");
sleep(1);
ret = shout_get_connected(shout);
}
if (ret == SHOUTERR_CONNECTED) {
printf("Connected to server...\n");
total = 0;
while (1) {
read = fread(buff, 1, sizeof(buff), stdin);
total = total + read;
if (read > 0) {
ret = shout_send(shout, buff, read);
if (ret != SHOUTERR_SUCCESS) {
printf("DEBUG: Send error: %s\n", shout_get_error(shout));
break;
}
} else {
break;
}
if (shout_queuelen(shout) > 0)
printf("DEBUG: queue length: %d\n", shout_queuelen(shout));
shout_sync(shout);
}
} else {
printf("Error connecting: %s\n", shout_get_error(shout));
}
shout_close(shout);
shout_shutdown();
return 0;
}
......@@ -159,12 +159,16 @@ int shout_send(shout_t *self, const unsigned char *data, size_t len);
*/
ssize_t shout_send_raw(shout_t *self, const unsigned char *data, size_t len);
/* return the number of bytes currently on the write queue (only makes sense in
* nonblocking mode). */
ssize_t shout_queuelen(shout_t *self);
/* Puts caller to sleep until it is time to send more data to the server */
void shout_sync(shout_t *self);
/* Amount of time in ms caller should wait before sending again */
int shout_delay(shout_t *self);
/* Sets MP3 metadata.
* Returns:
* SHOUTERR_SUCCESS
......
......@@ -39,10 +39,10 @@
#include "util.h"
/* -- local prototypes -- */
static int queue_data(shout_buf_t **queue, const unsigned char *data, size_t len);
static int queue_data(shout_queue_t *queue, const unsigned char *data, size_t len);
static int queue_str(shout_t *self, const char *str);
static int queue_printf(shout_t *self, const char *fmt, ...);
static void queue_free(shout_buf_t *queue);
static void queue_free(shout_queue_t *queue);
static int send_queue(shout_t *self);
static int get_response(shout_t *self);
static int try_connect (shout_t *self);
......@@ -202,7 +202,7 @@ ssize_t shout_send_raw(shout_t *self, const unsigned char *data, size_t len)
self->error = SHOUTERR_SUCCESS;
/* send immediately if possible (should be the common case) */
if (len && ! self->wqueue) {
if (len && ! self->wqueue.len) {
if ((ret = try_write(self, data, len)) < 0)
return self->error;
if (ret < len) {
......@@ -225,6 +225,15 @@ ssize_t shout_send_raw(shout_t *self, const unsigned char *data, size_t len)
return ret;
}
ssize_t shout_queuelen(shout_t *self)
{
if (!self)
return SHOUTERR_INSANE;
return (ssize_t)self->wqueue.len;
}
void shout_sync(shout_t *self)
{
int64_t sleep;
......@@ -771,7 +780,7 @@ unsigned int shout_get_nonblocking(shout_t *self)
/* -- static function definitions -- */
/* queue data in pages of SHOUT_BUFSIZE bytes */
static int queue_data(shout_buf_t **queue, const unsigned char *data, size_t len)
static int queue_data(shout_queue_t *queue, const unsigned char *data, size_t len)
{
shout_buf_t *buf;
size_t plen;
......@@ -779,13 +788,13 @@ static int queue_data(shout_buf_t **queue, const unsigned char *data, size_t len
if (!len)
return SHOUTERR_SUCCESS;
if (!*queue) {
*queue = calloc(1, sizeof (shout_buf_t));
if (! *queue)
if (!queue->len) {
queue->head = calloc(1, sizeof (shout_buf_t));
if (! queue->head)
return SHOUTERR_MALLOC;
}
for (buf = *queue; buf->next; buf = buf->next);
for (buf = queue->head; buf->next; buf = buf->next);
/* Maybe any added data should be freed if we hit a malloc error?
* Otherwise it'd be impossible to tell where to start requeueing.
......@@ -804,6 +813,7 @@ static int queue_data(shout_buf_t **queue, const unsigned char *data, size_t len
buf->len += plen;
data += plen;
len -= plen;
queue->len += plen;
}
return SHOUTERR_SUCCESS;
......@@ -850,15 +860,16 @@ static int queue_printf(shout_t *self, const char *fmt, ...)
return self->error;
}
static inline void queue_free(shout_buf_t *queue)
static inline void queue_free(shout_queue_t *queue)
{
shout_buf_t *prev;
while (queue) {
prev = queue;
queue = queue->next;
while (queue->head) {
prev = queue->head;
queue->head = queue->head->next;
free(prev);
}
queue->len = 0;
}
static int get_response(shout_t *self)
......@@ -881,7 +892,7 @@ static int get_response(shout_t *self)
/* work from the back looking for \r?\n\r?\n. Anything else means more
* is coming. */
for (queue = self->rqueue; queue->next; queue = queue->next);
for (queue = self->rqueue.head; queue->next; queue = queue->next);
pc = queue->data + queue->len - 1;
blen = queue->len;
while (blen) {
......@@ -1028,20 +1039,21 @@ static int send_queue(shout_t *self)
shout_buf_t *buf;
int ret;
if (!self->wqueue)
if (!self->wqueue.len)
return SHOUTERR_SUCCESS;
buf = self->wqueue;
buf = self->wqueue.head;
while (buf) {
ret = try_write (self, buf->data + buf->pos, buf->len - buf->pos);
if (ret < 0)
return self->error;
buf->pos += ret;
self->wqueue.len -= ret;
if (buf->pos == buf->len) {
self->wqueue = buf->next;
self->wqueue.head = buf->next;
free(buf);
buf = self->wqueue;
buf = self->wqueue.head;
if (buf)
buf->prev = NULL;
} else /* incomplete write */
......@@ -1165,11 +1177,10 @@ static int parse_http_response(shout_t *self)
#endif
/* all this copying! */
hlen = collect_queue(self->rqueue, &header);
hlen = collect_queue(self->rqueue.head, &header);
if (hlen <= 0)
return SHOUTERR_MALLOC;
queue_free(self->rqueue);
self->rqueue = NULL;
queue_free(&self->rqueue);
parser = httpp_create_parser();
httpp_initialize(parser, NULL);
......@@ -1228,10 +1239,9 @@ static int parse_xaudiocast_response(shout_t *self)
{
char *response;
if (collect_queue(self->rqueue, &response) <= 0)
if (collect_queue(self->rqueue.head, &response) <= 0)
return SHOUTERR_MALLOC;
queue_free(self->rqueue);
self->rqueue = NULL;
queue_free(&self->rqueue);
if (!strstr(response, "OK")) {
free(response);
......
......@@ -41,6 +41,11 @@ typedef struct _shout_buf {
struct _shout_buf *next;
} shout_buf_t;
typedef struct {
shout_buf_t *head;
size_t len;
} shout_queue_t;
typedef enum {
SHOUT_STATE_UNCONNECTED = 0,
SHOUT_STATE_CONNECT_PENDING,
......@@ -91,8 +96,8 @@ struct shout {
int (*send)(shout_t* self, const unsigned char* buff, size_t len);
void (*close)(shout_t* self);
shout_buf_t *rqueue;
shout_buf_t *wqueue;
shout_queue_t rqueue;
shout_queue_t wqueue;
/* start of this period's timeclock */
uint64_t starttime;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment