Commit 0abd6b3b authored by brendan's avatar brendan
Browse files

nonblocking API WIP. Don't use yet, this is just to keep from losing work.

Everything should still work in blocking mode.

svn path=/icecast/trunk/libshout/; revision=7354
parent 4af4e9ef
......@@ -139,6 +139,11 @@ unsigned int shout_get_format(shout_t *self);
int shout_set_protocol(shout_t *self, unsigned int protocol);
unsigned int shout_get_protocol(shout_t *self);
/* Instructs libshout to use nonblocking I/O. Must be called before
* shout_open (no switching back and forth midstream at the moment). */
int shout_set_nonblocking(shout_t* self, unsigned int nonblocking);
unsigned int shout_get_nonblocking(shout_t *self);
/* Opens a connection to the server. All parameters must already be set */
int shout_open(shout_t *self);
......
......@@ -45,11 +45,17 @@ static int queue_printf(shout_t *self, const char *fmt, ...);
static void queue_free(shout_buf_t *queue);
static int send_queue(shout_t *self);
static int get_response(shout_t *self);
static int try_connect (shout_t *self);
static int try_write (shout_t *self, const void *data, size_t len);
static int login_xaudiocast(shout_t *self);
static int login_icy(shout_t *self);
static int login_http_basic(shout_t *self);
static int create_request(shout_t *self);
static int create_http_request(shout_t *self);
static int create_xaudiocast_request(shout_t *self);
static int create_icy_request(shout_t *self);
static int parse_response(shout_t *self);
static int parse_http_response(shout_t *self);
static int parse_xaudiocast_response(shout_t *self);
static char *http_basic_authorization(shout_t *self);
/* -- static data -- */
......@@ -138,61 +144,14 @@ int shout_open(shout_t *self)
/* sanity check */
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return SHOUTERR_CONNECTED;
if (!self->host || !self->password || !self->port)
return self->error = SHOUTERR_INSANE;
if (self->format == SHOUT_FORMAT_OGG && self->protocol != SHOUT_PROTOCOL_HTTP)
return self->error = SHOUTERR_UNSUPPORTED;
if(self->protocol != SHOUT_PROTOCOL_HTTP) {
if (self->protocol == SHOUT_PROTOCOL_ICY)
self->socket = sock_connect(self->host, self->port+1);
else
self->socket = sock_connect(self->host, self->port);
if (self->socket <= 0)
return self->error = SHOUTERR_NOCONNECT;
}
if (self->protocol == SHOUT_PROTOCOL_HTTP) {
if ((self->error = login_http_basic(self)) != SHOUTERR_SUCCESS) {
return self->error;
}
} else if (self->protocol == SHOUT_PROTOCOL_XAUDIOCAST) {
if ((self->error = login_xaudiocast(self)) != SHOUTERR_SUCCESS) {
sock_close(self->socket);
return self->error;
}
} else if (self->protocol == SHOUT_PROTOCOL_ICY) {
if ((self->error = login_icy(self)) != SHOUTERR_SUCCESS) {
sock_close(self->socket);
return self->error;
}
} else
return self->error = SHOUTERR_INSANE;
if (self->format == SHOUT_FORMAT_OGG) {
if ((self->error = shout_open_ogg(self)) != SHOUTERR_SUCCESS) {
sock_close(self->socket);
return self->error;
}
} else if (self->format == SHOUT_FORMAT_MP3) {
if ((self->error = shout_open_mp3(self)) != SHOUTERR_SUCCESS) {
sock_close(self->socket);
return self->error;
}
} else {
sock_close(self->socket);
return self->error = SHOUTERR_INSANE;
}
self->connected = 1;
return self->error;
return self->error = try_connect(self);
}
......@@ -201,14 +160,14 @@ int shout_close(shout_t *self)
if (!self)
return SHOUTERR_INSANE;
if (!self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_UNCONNECTED;
if (self->close)
self->close(self);
sock_close(self->socket);
self->connected = 0;
self->state = SHOUT_STATE_UNCONNECTED;
return self->error = SHOUTERR_SUCCESS;
}
......@@ -218,7 +177,7 @@ int shout_send(shout_t *self, const unsigned char *data, size_t len)
if (!self)
return SHOUTERR_INSANE;
if (!self->connected)
if (self->state != SHOUT_STATE_CONNECTED)
return self->error = SHOUTERR_UNCONNECTED;
if (self->starttime <= 0)
......@@ -235,7 +194,7 @@ ssize_t shout_send_raw(shout_t *self, const unsigned char *data, size_t len)
if (!self)
return SHOUTERR_INSANE;
if (!self->connected)
if (self->state != SHOUT_STATE_CONNECTED)
return SHOUTERR_UNCONNECTED;
self->error = SHOUTERR_SUCCESS;
......@@ -392,12 +351,29 @@ const char *shout_get_error(shout_t *self)
}
}
/* Returns:
* SHOUTERR_CONNECTED if the connection is open,
* SHOUTERR_UNCONNECTED if it has not yet been opened,
* or an error from try_connect, including SHOUTERR_BUSY
*/
int shout_get_connected(shout_t *self)
{
if (self->connected)
int rc;
if (!self)
return SHOUTERR_INSANE;
if (self->state == SHOUT_STATE_CONNECTED)
return SHOUTERR_CONNECTED;
else
return SHOUTERR_UNCONNECTED;
if (self->state != SHOUT_STATE_UNCONNECTED) {
if ((rc = try_connect(self)) == SHOUTERR_SUCCESS)
return SHOUTERR_CONNECTED;
if (rc == SHOUTERR_BUSY)
return SHOUTERR_UNCONNECTED;
else return rc;
}
return SHOUTERR_UNCONNECTED;
}
int shout_set_host(shout_t *self, const char *host)
......@@ -405,7 +381,7 @@ int shout_set_host(shout_t *self, const char *host)
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
if (self->host)
......@@ -430,7 +406,7 @@ int shout_set_port(shout_t *self, unsigned short port)
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
self->port = port;
......@@ -451,7 +427,7 @@ int shout_set_password(shout_t *self, const char *password)
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
if (self->password)
......@@ -478,7 +454,7 @@ int shout_set_mount(shout_t *self, const char *mount)
if (!self || !mount)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
if (self->mount)
......@@ -509,7 +485,7 @@ int shout_set_name(shout_t *self, const char *name)
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
if (self->name)
......@@ -534,7 +510,7 @@ int shout_set_url(shout_t *self, const char *url)
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
if (self->url)
......@@ -559,7 +535,7 @@ int shout_set_genre(shout_t *self, const char *genre)
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
if (self->genre)
......@@ -584,7 +560,7 @@ int shout_set_agent(shout_t *self, const char *agent)
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
if (self->useragent)
......@@ -610,7 +586,7 @@ int shout_set_user(shout_t *self, const char *username)
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
if (self->user)
......@@ -635,7 +611,7 @@ int shout_set_description(shout_t *self, const char *description)
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
if (self->description)
......@@ -660,7 +636,7 @@ int shout_set_dumpfile(shout_t *self, const char *dumpfile)
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return SHOUTERR_CONNECTED;
if (self->dumpfile)
......@@ -695,7 +671,7 @@ int shout_set_public(shout_t *self, unsigned int public)
if (!self || (public != 0 && public != 1))
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
self->public = public;
......@@ -716,7 +692,7 @@ int shout_set_format(shout_t *self, unsigned int format)
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
if (format != SHOUT_FORMAT_OGG && format != SHOUT_FORMAT_MP3)
......@@ -740,7 +716,7 @@ int shout_set_protocol(shout_t *self, unsigned int protocol)
if (!self)
return SHOUTERR_INSANE;
if (self->connected)
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
if (protocol != SHOUT_PROTOCOL_HTTP &&
......@@ -761,6 +737,27 @@ unsigned int shout_get_protocol(shout_t *self)
return self->protocol;
}
int shout_set_nonblocking(shout_t *self, unsigned int nonblocking)
{
if (!self || (nonblocking != 0 && nonblocking != 1))
return SHOUTERR_INSANE;
if (self->state != SHOUT_STATE_UNCONNECTED)
return self->error = SHOUTERR_CONNECTED;
self->nonblocking = nonblocking;
return SHOUTERR_SUCCESS;
}
unsigned int shout_get_nonblocking(shout_t *self)
{
if (!self)
return 0;
return self->nonblocking;
}
/* -- static function definitions -- */
/* queue data in pages of SHOUT_BUFSIZE bytes */
......@@ -899,6 +896,82 @@ static int get_response(shout_t *self)
return SHOUTERR_BUSY;
}
static int try_connect (shout_t *self)
{
int rc;
int port;
/* the breaks between cases are omitted intentionally */
switch (self->state) {
case SHOUT_STATE_UNCONNECTED:
port = self->port;
if (shout_get_protocol(self) == SHOUT_PROTOCOL_ICY)
port++;
if (shout_get_nonblocking(self)) {
if ((self->socket = sock_connect_non_blocking(self->host, port)) < 0)
return self->error = SHOUTERR_NOCONNECT;
self->state = SHOUT_STATE_CONNECT_PENDING;
} else {
if ((self->socket = sock_connect(self->host, port)) < 0)
return self->error = SHOUTERR_NOCONNECT;
if ((rc = create_request(self)) != SHOUTERR_SUCCESS)
return rc;
self->state = SHOUT_STATE_REQ_PENDING;
}
case SHOUT_STATE_CONNECT_PENDING:
if (shout_get_nonblocking(self)) {
if (!sock_connected(self->socket, 0))
return SHOUTERR_BUSY;
if ((rc = create_request(self)) != SHOUTERR_SUCCESS)
return rc;
}
self->state = SHOUT_STATE_REQ_PENDING;
case SHOUT_STATE_REQ_PENDING:
do
rc = send_queue(self);
while (!shout_get_nonblocking(self) && rc == SHOUTERR_BUSY);
if (rc != SHOUTERR_SUCCESS)
return rc;
self->state = SHOUT_STATE_RESP_PENDING;
case SHOUT_STATE_RESP_PENDING:
do
rc = get_response(self);
while (!shout_get_nonblocking(self) && rc == SHOUTERR_BUSY);
if (rc != SHOUTERR_SUCCESS)
return rc;
if ((rc = parse_response(self)) != SHOUTERR_SUCCESS)
return rc;
if (self->format == SHOUT_FORMAT_OGG) {
if ((self->error = shout_open_ogg(self)) != SHOUTERR_SUCCESS) {
self->state = SHOUT_STATE_UNCONNECTED;
sock_close(self->socket);
return self->error;
}
} else if (self->format == SHOUT_FORMAT_MP3) {
if ((self->error = shout_open_mp3(self)) != SHOUTERR_SUCCESS) {
self->state = SHOUT_STATE_UNCONNECTED;
sock_close(self->socket);
return self->error;
}
} else {
self->state = SHOUT_STATE_UNCONNECTED;
sock_close(self->socket);
return self->error = SHOUTERR_INSANE;
}
case SHOUT_STATE_CONNECTED:
self->state = SHOUT_STATE_CONNECTED;
}
return SHOUTERR_SUCCESS;
}
static int try_write (shout_t *self, const void *data, size_t len)
{
int ret = sock_write_bytes (self->socket, data, len);
......@@ -942,7 +1015,7 @@ static int send_queue(shout_t *self)
int ret;
if (!self->wqueue)
return 0;
return SHOUTERR_SUCCESS;
buf = self->wqueue;
while (buf) {
......@@ -958,12 +1031,24 @@ static int send_queue(shout_t *self)
if (buf)
buf->prev = NULL;
} else /* incomplete write */
return SHOUTERR_SUCCESS;
return SHOUTERR_BUSY;
}
return self->error = SHOUTERR_SUCCESS;
}
static int create_request(shout_t *self)
{
if (self->protocol == SHOUT_PROTOCOL_HTTP)
return create_http_request(self);
else if (self->protocol == SHOUT_PROTOCOL_XAUDIOCAST)
return create_xaudiocast_request(self);
else if (self->protocol == SHOUT_PROTOCOL_ICY)
return create_icy_request(self);
return self->error = SHOUTERR_UNSUPPORTED;
}
static int create_http_request(shout_t *self)
{
char *auth;
......@@ -1043,7 +1128,18 @@ static char *http_basic_authorization(shout_t *self)
return in;
}
static int login_http_basic(shout_t *self)
static int parse_response(shout_t *self)
{
if (self->protocol == SHOUT_PROTOCOL_HTTP)
return parse_http_response(self);
else if (self->protocol == SHOUT_PROTOCOL_XAUDIOCAST ||
self->protocol == SHOUT_PROTOCOL_ICY)
return parse_xaudiocast_response(self);
return self->error = SHOUTERR_UNSUPPORTED;
}
static int parse_http_response(shout_t *self)
{
http_parser_t *parser;
char *header;
......@@ -1053,23 +1149,6 @@ static int login_http_basic(shout_t *self)
#if 0
char *realm;
#endif
self->error = SHOUTERR_SOCKET;
self->socket = sock_connect(self->host, self->port);
if (self->socket < 0)
return self->error = SHOUTERR_NOCONNECT;
/* assume we'll have to authenticate, saves round trips on basic */
if(create_http_request(self) != SHOUTERR_SUCCESS)
return self->error;
if (send_queue(self) != SHOUTERR_SUCCESS)
return self->error;
while ((code = get_response(self)) == SHOUTERR_BUSY);
if (code != SHOUTERR_SUCCESS)
return code;
/* all this copying! */
hlen = collect_queue(self->rqueue, &header);
......@@ -1088,43 +1167,6 @@ static int login_http_basic(shout_t *self)
httpp_destroy(parser);
return SHOUTERR_SUCCESS;
}
#if 0
else if(code == 401) {
/* Don't really use this right now other than to check that it's
* present.
*/
realm = httpp_getvar(parser, "www-authenticate");
if(realm) {
httpp_destroy(parser);
sock_close(self->socket);
self->socket = sock_connect(self->host, self->port);
if (self->socket <= 0)
return self->error = SHOUTERR_NOCONNECT;
if(send_http_request(self, self->user, self->password) != 0) {
sock_close(self->socket);
return self->error = SHOUTERR_SOCKET;
}
if (_shout_util_read_header(self->socket, header, 4096) == 0) {
/* either we didn't get a complete header, or we timed out */
sock_close(self->socket);
return self->error = SHOUTERR_SOCKET;
}
parser = httpp_create_parser();
httpp_initialize(parser, NULL);
if (httpp_parse_response(parser, header, strlen(header), self->mount)) {
retcode = httpp_getvar(parser, HTTPP_VAR_ERROR_CODE);
code = atoi(retcode);
if(code >= 200 && code < 300) {
httpp_destroy(parser);
return SHOUTERR_SUCCESS;
}
}
}
}
#endif
}
free(header);
......@@ -1132,9 +1174,8 @@ static int login_http_basic(shout_t *self)
return self->error = SHOUTERR_NOLOGIN;
}
static int login_xaudiocast(shout_t *self)
static int create_xaudiocast_request(shout_t *self)
{
char response[4096];
const char *bitrate;
int ret;
......@@ -1166,24 +1207,29 @@ static int login_xaudiocast(shout_t *self)
ret = SHOUTERR_SUCCESS;
} while (0);
if (ret != SHOUTERR_SUCCESS)
return ret;
return ret;
}
if ((ret = send_queue(self)) != SHOUTERR_SUCCESS)
return ret;
static int parse_xaudiocast_response(shout_t *self)
{
char *response;
if (!sock_read_line(self->socket, response, sizeof(response)))
return SHOUTERR_SOCKET;
if (collect_queue(self->rqueue, &response) <= 0)
return SHOUTERR_MALLOC;
queue_free(self->rqueue);
self->rqueue = NULL;
if (!strstr(response, "OK"))
if (!strstr(response, "OK")) {
free(response);
return SHOUTERR_NOLOGIN;
}
free(response);
return SHOUTERR_SUCCESS;
}
static int login_icy(shout_t *self)
static int create_icy_request(shout_t *self)
{
char response[4096];
const char *bitrate;
int ret;
......@@ -1211,17 +1257,5 @@ static int login_icy(shout_t *self)
ret = SHOUTERR_SUCCESS;
} while (0);
if (ret != SHOUTERR_SUCCESS)
return ret;
if ((ret = send_queue(self)) != SHOUTERR_SUCCESS)
return ret;
if (!sock_read_line(self->socket, response, sizeof(response)))
return SHOUTERR_SOCKET;
if (!strstr(response, "OK"))
return SHOUTERR_NOLOGIN;
return SHOUTERR_SUCCESS;
return ret;
}
......@@ -41,6 +41,14 @@ typedef struct _shout_buf {
struct _shout_buf *next;
} shout_buf_t;
typedef enum {
SHOUT_STATE_UNCONNECTED = 0,
SHOUT_STATE_CONNECT_PENDING,
SHOUT_STATE_REQ_PENDING,
SHOUT_STATE_RESP_PENDING,
SHOUT_STATE_CONNECTED
} shout_state_e;
struct shout {
/* hostname or IP of icecast server */
char *host;
......@@ -74,10 +82,10 @@ struct shout {
/* is this stream private? */
int public;
/* are we connected to a server? */
int connected;
/* socket the connection is on */
sock_t socket;
shout_state_e state;
int nonblocking;
void *format_data;
int (*send)(shout_t* self, const unsigned char* buff, size_t len);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment