Commit 0feb4c3f authored by brendan's avatar brendan
Browse files

Create read queue for buffering response header.

Buffer response header in HTTP login (we're very close to ready for
  non-blocking I/O now).

svn path=/icecast/trunk/libshout/; revision=7338
parent 01cf648f
......@@ -39,9 +39,12 @@
#include "util.h"
/* -- local prototypes -- */
static int queue_data(shout_t *self, const unsigned char *data, size_t len);
static int queue_data(shout_buf_t **queue, const unsigned char *data, size_t len);
static int queue_str(shout_t *self, const char *str);
static int queue_printf(shout_t *self, const char *fmt, ...);
static void queue_free(shout_buf_t *queue);
static int send_queue(shout_t *self);
static int get_response(shout_t *self);
static int try_write (shout_t *self, const void *data, size_t len);
static int login_xaudiocast(shout_t *self);
......@@ -761,7 +764,7 @@ unsigned int shout_get_protocol(shout_t *self)
/* -- static function definitions -- */
/* queue data in pages of SHOUT_BUFSIZE bytes */
static int queue_data(shout_t *self, const unsigned char *data, size_t len)
static int queue_data(shout_buf_t **queue, const unsigned char *data, size_t len)
{
shout_buf_t *buf;
size_t plen;
......@@ -769,13 +772,13 @@ static int queue_data(shout_t *self, const unsigned char *data, size_t len)
if (!len)
return SHOUTERR_SUCCESS;
if (!self->queue) {
self->queue = calloc(1, sizeof (shout_buf_t));
if (! self->queue)
if (!*queue) {
*queue = calloc(1, sizeof (shout_buf_t));
if (! *queue)
return SHOUTERR_MALLOC;
}
for (buf = self->queue; buf->next; buf = buf->next);
for (buf = *queue; buf->next; buf = buf->next);
/* Maybe any added data should be freed if we hit a malloc error?
* Otherwise it'd be impossible to tell where to start requeueing.
......@@ -785,6 +788,7 @@ static int queue_data(shout_t *self, const unsigned char *data, size_t len)
buf->next = calloc(1, sizeof (shout_buf_t));
if (! buf->next)
return SHOUTERR_MALLOC;
buf->next->prev = buf;
buf = buf->next;
}
......@@ -800,7 +804,7 @@ static int queue_data(shout_t *self, const unsigned char *data, size_t len)
static inline int queue_str(shout_t *self, const char *str)
{
return queue_data(self, str, strlen(str));
return queue_data(&self->wqueue, str, strlen(str));
}
/* this should be shared with sock_write. Create libicecommon. */
......@@ -821,12 +825,12 @@ static int queue_printf(shout_t *self, const char *fmt, ...)
self->error = SHOUTERR_SUCCESS;
if (len > 0) {
if ((size_t)len < sizeof(buffer))
queue_data(self, buf, len);
queue_data(&self->wqueue, buf, len);
else {
buf = malloc(++len);
if (buf) {
len = vsnprintf(buf, len, fmt, ap_retry);
queue_data(self, buf, len);
queue_data(&self->wqueue, buf, len);
free(buf);
} else
self->error = SHOUTERR_MALLOC;
......@@ -839,6 +843,62 @@ static int queue_printf(shout_t *self, const char *fmt, ...)
return self->error;
}
static inline void queue_free(shout_buf_t *queue)
{
shout_buf_t *prev;
while (queue) {
prev = queue;
queue = queue->next;
free(prev);
}
}
static int get_response(shout_t *self)
{
char buf[1024];
int rc, blen;
char *pc;
shout_buf_t *queue;
int newlines = 0;
rc = sock_read_bytes(self->socket, buf, sizeof(buf));
if (rc < 0 && sock_recoverable(rc))
return SHOUTERR_BUSY;
if (!rc)
return SHOUTERR_SOCKET;
if ((rc = queue_data(&self->rqueue, buf, rc)))
return rc;
/* work from the back looking for \r?\n\r?\n. Anything else means more
* is coming. */
for (queue = self->rqueue; queue->next; queue = queue->next);
pc = queue->data + queue->len - 1;
blen = queue->len;
while (blen) {
if (*pc == '\n')
newlines++;
else if (*pc != '\r')
break;
if (newlines == 2)
return SHOUTERR_SUCCESS;
blen--;
pc--;
if (!blen && queue->prev) {
queue = queue->prev;
pc = queue->data + queue->len - 1;
blen = queue->len;
}
}
return SHOUTERR_BUSY;
}
static int try_write (shout_t *self, const void *data, size_t len)
{
int ret = sock_write_bytes (self->socket, data, len);
......@@ -855,15 +915,36 @@ static int try_write (shout_t *self, const void *data, size_t len)
return ret;
}
/* collect nodes of a queue into a single buffer */
static int collect_queue(shout_buf_t *queue, char **buf)
{
shout_buf_t *node;
int pos = 0;
int len = 0;
for (node = queue; node; node = node->next)
len += node->len;
if (!(*buf = malloc(len)))
return SHOUTERR_MALLOC;
for (node = queue; node; node = node->next) {
memcpy(*buf + pos, node->data, node->len);
pos += node->len;
}
return len;
}
static int send_queue(shout_t *self)
{
shout_buf_t *buf;
int ret;
if (!self->queue)
if (!self->wqueue)
return 0;
buf = self->queue;
buf = self->wqueue;
while (buf) {
ret = try_write (self, buf->data + buf->pos, buf->len - buf->pos);
if (ret < 0)
......@@ -871,9 +952,11 @@ static int send_queue(shout_t *self)
buf->pos += ret;
if (buf->pos == buf->len) {
self->queue = buf->next;
self->wqueue = buf->next;
free(buf);
buf = self->queue;
buf = self->wqueue;
if (buf)
buf->prev = NULL;
} else /* incomplete write */
return SHOUTERR_SUCCESS;
}
......@@ -962,8 +1045,9 @@ static char *http_basic_authorization(shout_t *self)
static int login_http_basic(shout_t *self)
{
char header[4096];
http_parser_t *parser;
char *header;
int hlen = 0;
int code;
char *retcode;
#if 0
......@@ -983,13 +1067,21 @@ static int login_http_basic(shout_t *self)
if (send_queue(self) != SHOUTERR_SUCCESS)
return self->error;
if (_shout_util_read_header(self->socket, header, 4096) == 0)
/* either we didn't get a complete header, or we timed out */
return self->error = SHOUTERR_SOCKET;
while ((code = get_response(self)) == SHOUTERR_BUSY);
if (code != SHOUTERR_SUCCESS)
return code;
/* all this copying! */
hlen = collect_queue(self->rqueue, &header);
if (hlen <= 0)
return SHOUTERR_MALLOC;
queue_free(self->rqueue);
self->rqueue = NULL;
parser = httpp_create_parser();
httpp_initialize(parser, NULL);
if (httpp_parse_response(parser, header, strlen(header), self->mount)) {
if (httpp_parse_response(parser, header, hlen, self->mount)) {
free (header);
retcode = httpp_getvar(parser, HTTPP_VAR_ERROR_CODE);
code = atoi(retcode);
if(code >= 200 && code < 300) {
......@@ -1035,6 +1127,7 @@ static int login_http_basic(shout_t *self)
#endif
}
free(header);
httpp_destroy(parser);
return self->error = SHOUTERR_NOLOGIN;
}
......@@ -1088,7 +1181,7 @@ static int login_xaudiocast(shout_t *self)
return SHOUTERR_SUCCESS;
}
int login_icy(shout_t *self)
static int login_icy(shout_t *self)
{
char response[4096];
const char *bitrate;
......
......@@ -37,6 +37,7 @@ typedef struct _shout_buf {
unsigned int len;
unsigned int pos;
struct _shout_buf *prev;
struct _shout_buf *next;
} shout_buf_t;
......@@ -82,7 +83,8 @@ struct shout {
int (*send)(shout_t* self, const unsigned char* buff, size_t len);
void (*close)(shout_t* self);
shout_buf_t *queue;
shout_buf_t *rqueue;
shout_buf_t *wqueue;
/* start of this period's timeclock */
uint64_t starttime;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment