Commit d7f1285b authored by Karl Heyes's avatar Karl Heyes

drop the thread pool of connection threads, they were using a blocking socket

on incoming connections. Now we get the accept thread to create a client_t
and mark it as a shoutcast client if need be.  Then use a single connection
thread to poll the non-blocking sockets for the headers. When complete they
get handled as usual.

svn path=/icecast/trunk/icecast/; revision=9733
parent 30ce5c5b
......@@ -39,33 +39,34 @@
#undef CATMODULE
#define CATMODULE "client"
client_t *client_create(connection_t *con, http_parser_t *parser)
/* should be called with global lock held */
int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser)
{
ice_config_t *config = config_get_config ();
ice_config_t *config;
client_t *client = (client_t *)calloc(1, sizeof(client_t));
int client_limit = config->client_limit;
config_release_config ();
int ret = -1;
if (client == NULL)
return -1;
config = config_get_config ();
global_lock();
if (global.clients >= client_limit || client == NULL)
{
client_limit = global.clients;
global_unlock();
free (client);
WARN1 ("server client limit reached (%d clients)", client_limit);
return NULL;
}
global.clients++;
stats_event_args (NULL, "clients", "%d", global.clients);
global_unlock();
if (config->client_limit < global.clients)
WARN2 ("server client limit reached (%d/%d)", config->client_limit, global.clients);
else
ret = 0;
config_release_config ();
stats_event_args (NULL, "clients", "%d", global.clients);
client->con = con;
client->parser = parser;
client->refbuf = NULL;
client->pos = 0;
client->write_to_client = format_generic_write_to_client;
*c_ptr = client;
return client;
return ret;
}
void client_destroy(client_t *client)
......@@ -110,7 +111,23 @@ void client_destroy(client_t *client)
/* helper function for reading data from a client */
int client_read_bytes (client_t *client, void *buf, unsigned len)
{
int bytes = sock_read_bytes (client->con->sock, buf, len);
int bytes;
if (client->refbuf && client->refbuf->len)
{
/* we have data to read from a refbuf first */
if (client->refbuf->len < len)
len = client->refbuf->len;
memcpy (buf, client->refbuf->data, len);
if (client->refbuf->len < len)
{
char *ptr = client->refbuf->data;
memmove (ptr, ptr+len, client->refbuf->len - len);
}
client->refbuf->len -= len;
return len;
}
bytes = sock_read_bytes (client->con->sock, buf, len);
if (bytes > 0)
return bytes;
......
......@@ -67,7 +67,7 @@ typedef struct _client_tag
} client_t;
client_t *client_create(connection_t *con, http_parser_t *parser);
int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser);
void client_destroy(client_t *client);
void client_send_504(client_t *client, char *message);
void client_send_404(client_t *client, char *message);
......
......@@ -80,10 +80,13 @@
#define SHOUTCAST_SOURCE_AUTH 1
#define ICECAST_SOURCE_AUTH 0
typedef struct con_queue_tag {
connection_t *con;
struct con_queue_tag *next;
} con_queue_t;
typedef struct client_queue_tag {
client_t *client;
int offset;
int stream_offset;
int shoutcast;
struct client_queue_tag *next;
} client_queue_t;
typedef struct _thread_queue_tag {
thread_type *thread_id;
......@@ -93,11 +96,12 @@ typedef struct _thread_queue_tag {
static mutex_t _connection_mutex;
static volatile unsigned long _current_id = 0;
static int _initialized = 0;
static thread_type *tid;
volatile static con_queue_t *_queue = NULL;
static mutex_t _queue_mutex;
static thread_queue_t *_conhands = NULL;
static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue;
static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
static mutex_t _con_queue_mutex;
static mutex_t _req_queue_mutex;
rwlock_t _source_shutdown_rwlock;
......@@ -108,7 +112,8 @@ void connection_initialize(void)
if (_initialized) return;
thread_mutex_create(&_connection_mutex);
thread_mutex_create(&_queue_mutex);
thread_mutex_create(&_con_queue_mutex);
thread_mutex_create(&_req_queue_mutex);
thread_mutex_create(&move_clients_mutex);
thread_rwlock_create(&_source_shutdown_rwlock);
thread_cond_create(&global.shutdown_cond);
......@@ -122,7 +127,8 @@ void connection_shutdown(void)
thread_cond_destroy(&global.shutdown_cond);
thread_rwlock_destroy(&_source_shutdown_rwlock);
thread_mutex_destroy(&_queue_mutex);
thread_mutex_destroy(&_con_queue_mutex);
thread_mutex_destroy(&_req_queue_mutex);
thread_mutex_destroy(&_connection_mutex);
thread_mutex_destroy(&move_clients_mutex);
......@@ -140,18 +146,18 @@ static unsigned long _next_connection_id(void)
return id;
}
connection_t *create_connection(sock_t sock, sock_t serversock, char *ip) {
connection_t *connection_create (sock_t sock, sock_t serversock, char *ip)
{
connection_t *con;
con = (connection_t *)malloc(sizeof(connection_t));
memset(con, 0, sizeof(connection_t));
con->sock = sock;
con->serversock = serversock;
con->con_time = time(NULL);
con->id = _next_connection_id();
con->ip = ip;
con->event_number = EVENT_NO_EVENT;
con->event = NULL;
con = (connection_t *)calloc(1, sizeof(connection_t));
if (con)
{
con->sock = sock;
con->serversock = serversock;
con->con_time = time(NULL);
con->id = _next_connection_id();
con->ip = ip;
}
return con;
}
......@@ -254,8 +260,11 @@ static connection_t *_accept_connection(void)
ip = (char *)malloc(MAX_ADDR_LEN);
sock = sock_accept(serversock, ip, MAX_ADDR_LEN);
if (sock >= 0) {
con = create_connection(sock, serversock, ip);
if (sock >= 0)
{
con = connection_create (sock, serversock, ip);
if (con == NULL)
free (ip);
return con;
}
......@@ -268,171 +277,207 @@ static connection_t *_accept_connection(void)
return NULL;
}
static void _add_connection(connection_t *con)
{
con_queue_t *node;
node = (con_queue_t *)malloc(sizeof(con_queue_t));
thread_mutex_lock(&_queue_mutex);
node->con = con;
node->next = (con_queue_t *)_queue;
_queue = node;
thread_mutex_unlock(&_queue_mutex);
}
static void _push_thread(thread_queue_t **queue, thread_type *thread_id)
/* add client to connection queue. At this point some header information
* has been collected, so we now pass it onto the connection thread for
* further processing
*/
static void _add_connection (client_queue_t *node)
{
/* create item */
thread_queue_t *item = (thread_queue_t *)malloc(sizeof(thread_queue_t));
item->thread_id = thread_id;
item->next = NULL;
thread_mutex_lock(&_queue_mutex);
if (*queue == NULL) {
*queue = item;
} else {
item->next = *queue;
*queue = item;
}
thread_mutex_unlock(&_queue_mutex);
thread_mutex_lock (&_con_queue_mutex);
*_con_queue_tail = node;
_con_queue_tail = (volatile client_queue_t **)&node->next;
thread_mutex_unlock (&_con_queue_mutex);
}
static thread_type *_pop_thread(thread_queue_t **queue)
{
thread_type *id;
thread_queue_t *item;
thread_mutex_lock(&_queue_mutex);
/* this returns queued clients for the connection thread. headers are
* already provided, but need to be parsed.
*/
static client_queue_t *_get_connection(void)
{
client_queue_t *node = NULL;
item = *queue;
if (item == NULL) {
thread_mutex_unlock(&_queue_mutex);
return NULL;
/* common case, no new connections so don't bother taking locks */
if (_con_queue)
{
thread_mutex_lock (&_con_queue_mutex);
node = (client_queue_t *)_con_queue;
_con_queue = node->next;
if (_con_queue == NULL)
_con_queue_tail = &_con_queue;
thread_mutex_unlock (&_con_queue_mutex);
}
return node;
}
*queue = item->next;
item->next = NULL;
id = item->thread_id;
free(item);
thread_mutex_unlock(&_queue_mutex);
/* run along queue checking for any data that has come in or a timeout */
static void process_request_queue ()
{
client_queue_t **node_ref = (client_queue_t **)&_req_queue;
ice_config_t *config = config_get_config ();
int timeout = config->header_timeout;
config_release_config();
return id;
}
while (*node_ref)
{
client_queue_t *node = *node_ref;
client_t *client = node->client;
int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset;
char *buf = client->refbuf->data + node->offset;
static void _build_pool(void)
{
ice_config_t *config;
int i;
thread_type *tid;
char buff[64];
int threadpool_size;
if (len > 0)
{
if (client->con->con_time + timeout <= time(NULL))
len = 0;
else
len = client_read_bytes (client, buf, len);
}
config = config_get_config();
threadpool_size = config->threadpool_size;
config_release_config();
if (len > 0)
{
int pass_it = 1;
char *ptr;
node->offset += len;
client->refbuf->data [node->offset] = '\000';
do
{
if (node->shoutcast == 1)
{
/* password line */
if (strstr (client->refbuf->data, "\r\n") != NULL)
break;
if (strstr (client->refbuf->data, "\n") != NULL)
break;
}
/* stream_offset refers to the start of any data sent after the
* http style headers, we don't want to lose those */
ptr = strstr (client->refbuf->data, "\r\n\r\n");
if (ptr)
{
node->stream_offset = (ptr+4) - client->refbuf->data;
break;
}
ptr = strstr (client->refbuf->data, "\n\n");
if (ptr)
{
node->stream_offset = (ptr+2) - client->refbuf->data;
break;
}
pass_it = 0;
} while (0);
for (i = 0; i < threadpool_size; i++) {
snprintf(buff, 64, "Connection Thread #%d", i);
tid = thread_create(buff, _handle_connection, NULL, THREAD_ATTACHED);
_push_thread(&_conhands, tid);
if (pass_it)
{
if ((client_queue_t **)_req_queue_tail == &(node->next))
_req_queue_tail = (volatile client_queue_t **)node_ref;
*node_ref = node->next;
node->next = NULL;
_add_connection (node);
}
}
else
{
if (len == 0 || client->con->error)
{
if ((client_queue_t **)_req_queue_tail == &node->next)
_req_queue_tail = (volatile client_queue_t **)node_ref;
*node_ref = node->next;
client_destroy (client);
free (node);
continue;
}
}
node_ref = &node->next;
}
}
static void _destroy_pool(void)
{
thread_type *id;
int i;
i = 0;
id = _pop_thread(&_conhands);
while (id != NULL) {
thread_join(id);
id = _pop_thread(&_conhands);
}
INFO0("All connection threads down");
/* add node to the queue of requests. This is where the clients are when
* initial http details are read.
*/
static void _add_request_queue (client_queue_t *node)
{
thread_mutex_lock (&_req_queue_mutex);
*_req_queue_tail = node;
_req_queue_tail = (volatile client_queue_t **)&node->next;
thread_mutex_unlock (&_req_queue_mutex);
}
void connection_accept_loop(void)
{
connection_t *con;
_build_pool();
tid = thread_create ("connection thread", _handle_connection, NULL, THREAD_ATTACHED);
while (global.running == ICE_RUNNING)
{
if (global . schedule_config_reread)
con = _accept_connection();
if (con)
{
/* reread config file */
INFO0("Scheduling config reread ...");
client_queue_t *node;
ice_config_t *config;
int i;
client_t *client = NULL;
connection_inject_event(EVENT_CONFIG_READ, NULL);
global . schedule_config_reread = 0;
}
global_lock();
if (client_create (&client, con, NULL) < 0)
{
global_unlock();
client_send_404 (client, "Icecast connection limit reached");
continue;
}
global_unlock();
con = _accept_connection();
/* setup client for reading incoming http */
client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
client->refbuf->data [PER_CLIENT_REFBUF_SIZE-1] = '\000';
client->refbuf->len = 0; /* force reader code to ignore buffer */
node = calloc (1, sizeof (client_queue_t));
if (node == NULL)
{
client_destroy (client);
continue;
}
node->client = client;
/* Check for special shoutcast compatability processing */
config = config_get_config();
for (i = 0; i < global.server_sockets; i++)
{
if (global.serversock[i] == con->serversock)
{
if (config->listeners[i].shoutcast_compat)
node->shoutcast = 1;
}
}
config_release_config();
sock_set_blocking (client->con->sock, SOCK_NONBLOCK);
sock_set_nodelay (client->con->sock);
if (con) {
_add_connection(con);
_add_request_queue (node);
stats_event_inc (NULL, "connections");
}
process_request_queue ();
}
/* Give all the other threads notification to shut down */
thread_cond_broadcast(&global.shutdown_cond);
_destroy_pool();
if (tid)
thread_join (tid);
/* wait for all the sources to shutdown */
thread_rwlock_wlock(&_source_shutdown_rwlock);
thread_rwlock_unlock(&_source_shutdown_rwlock);
}
static connection_t *_get_connection(void)
{
con_queue_t *node = NULL;
con_queue_t *oldnode = NULL;
connection_t *con = NULL;
/* common case, no new connections so don't bother taking locks */
if (_queue == NULL)
return NULL;
thread_mutex_lock(&_queue_mutex);
if (_queue) {
node = (con_queue_t *)_queue;
while (node->next) {
oldnode = node;
node = node->next;
}
/* node is now the last node
** and oldnode is the previous one, or NULL
*/
if (oldnode) oldnode->next = NULL;
else (_queue) = NULL;
}
thread_mutex_unlock(&_queue_mutex);
if (node) {
con = node->con;
free(node);
}
return con;
}
void connection_inject_event(int eventnum, void *event_data) {
connection_t *con = calloc(1, sizeof(connection_t));
con->event_number = eventnum;
con->event = event_data;
_add_connection(con);
}
/* Called when activating a source. Verifies that the source count is not
* exceeded and applies any initial parameters.
......@@ -484,10 +529,6 @@ int connection_complete_source (source_t *source)
return -1;
}
global.sources++;
stats_event_args (NULL, "sources", "%d", global.sources);
global_unlock();
/* for relays, we don't yet have a client, however we do require one
* to retrieve the stream from. This is created here, quite late,
* because we can't use this client to return an error code/message,
......@@ -495,12 +536,9 @@ int connection_complete_source (source_t *source)
*/
if (source->client == NULL)
{
source->client = client_create (source->con, source->parser);
if (source->client == NULL)
if (client_create (&source->client, source->con, source->parser) < 0)
{
config_release_config();
global_lock();
global.sources--;
global_unlock();
connection_close (source->con);
source->con = NULL;
......@@ -509,6 +547,9 @@ int connection_complete_source (source_t *source)
return -1;
}
}
global.sources++;
stats_event_args (NULL, "sources", "%d", global.sources);
global_unlock();
source->running = 1;
mountinfo = config_find_mount (config, source->mount);
......@@ -823,160 +864,141 @@ static void _handle_get_request (client_t *client, char *passed_uri)
return;
}
sock_set_blocking(client->con->sock, SOCK_NONBLOCK);
sock_set_nodelay(client->con->sock);
client->write_to_client = format_generic_write_to_client;
client->check_buffer = format_check_http_buffer;
client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
add_client (uri, client);
if (uri != passed_uri) free (uri);
}
void _handle_shoutcast_compatible(connection_t *con, char *mount, char *source_password) {
char shoutcast_password[256];
static void _handle_shoutcast_compatible (client_queue_t *node)
{
char *http_compliant;
int http_compliant_len = 0;
char header[4096];
http_parser_t *parser;
ice_config_t *config = config_get_config ();
char *shoutcast_mount;
client_t *client = node->client;
memset(shoutcast_password, 0, sizeof (shoutcast_password));
/* Step one of shoutcast auth protocol, read encoder password (1 line) */
if (util_read_header(con->sock, shoutcast_password,
sizeof (shoutcast_password),
READ_LINE) == 0) {
/* either we didn't get a complete line, or we timed out */
connection_close(con);
return;
}
/* Get rid of trailing \n */
shoutcast_password[strlen(shoutcast_password)-1] = '\000';
if (strcmp(shoutcast_password, source_password)) {
ERROR0("Invalid source password");
connection_close(con);
return;
}
/* Step two of shoutcast auth protocol, send OK2. For those
interested, OK2 means it supports metadata updates via admin.cgi,
and the string "OK" can also be sent, but will indicate to the
shoutcast source client to not send metadata updates.
I believe icecast 1.x used to send OK. */
sock_write(con->sock, "%s\r\n", "OK2");
memset(header, 0, sizeof (header));
/* Step three of shoutcast auth protocol, read HTTP-style
request headers and process them.*/
if (util_read_header(con->sock, header, sizeof (header),
READ_ENTIRE_HEADER) == 0) {
/* either we didn't get a complete header, or we timed out */
connection_close(con);
if (node->shoutcast == 1)
{
char *source_password, *ptr;
mount_proxy *mountinfo = config_find_mount (config, config->shoutcast_mount);
if (mountinfo && mountinfo->password)
source_password = strdup (mountinfo->password);
else
source_password = strdup (config->source_password);
config_release_config();
/* Get rid of trailing \r\n or \n after password */
ptr = strstr (client->refbuf->data, "\r\n");
if (ptr == NULL)
ptr = strstr (client->refbuf->data, "\n");
if (ptr == NULL)
{
client_destroy (client);
free (source_password);
free (node);
return;
}
*ptr = '\0';
if (strcmp (client->refbuf->data, source_password) == 0)
{
client->respcode = 200;
/* send this non-blocking but if there is only a partial write
* then leave to header timeout */
sock_write (client->con->sock, "OK2\r\n");
memset (client->refbuf->data, 0, client->refbuf->len);
node->shoutcast = 2;
node->offset = 0;
/* we've checked the password, now send it back for reading headers */
_add_request_queue (node);
free (source_password);
return;
}
client_destroy (client);