Commit d07723c9 authored by Karl Heyes's avatar Karl Heyes
Browse files

fixes for client handling, these are all related to the handling of max clients.

I've taken out the client_create out of the connection_complete_source and put
it in slave, that way we can control the cleanup of the memory/socket better, the
change also meant fallback to file tests were slghtly different.

svn path=/icecast/trunk/icecast/; revision=9847
parent c519afa8
...@@ -322,9 +322,12 @@ static int add_authenticated_client (const char *mount, mount_proxy *mountinfo, ...@@ -322,9 +322,12 @@ static int add_authenticated_client (const char *mount, mount_proxy *mountinfo,
avl_tree_unlock (global.source_tree); avl_tree_unlock (global.source_tree);
return -1; return -1;
} }
/* set a per-mount disconnect time if auth hasn't set one already */ if (mountinfo)
if (mountinfo->max_listener_duration && client->con->discon_time == 0) {
client->con->discon_time = time(NULL) + mountinfo->max_listener_duration; /* set a per-mount disconnect time if auth hasn't set one already */
if (mountinfo->max_listener_duration && client->con->discon_time == 0)
client->con->discon_time = time(NULL) + mountinfo->max_listener_duration;
}
ret = add_client_to_source (source, client); ret = add_client_to_source (source, client);
avl_tree_unlock (global.source_tree); avl_tree_unlock (global.source_tree);
......
...@@ -44,7 +44,11 @@ ...@@ -44,7 +44,11 @@
#undef CATMODULE #undef CATMODULE
#define CATMODULE "client" #define CATMODULE "client"
/* should be called with global lock held */ /* create a client_t with the provided connection and parser details. Return
* 0 on success, -1 if server limit has been reached. In either case a
* client_t is returned just in case a message needs to be returned. Should
* be called with global lock held.
*/
int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser) int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser)
{ {
ice_config_t *config; ice_config_t *config;
...@@ -52,7 +56,7 @@ int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser) ...@@ -52,7 +56,7 @@ int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser)
int ret = -1; int ret = -1;
if (client == NULL) if (client == NULL)
return -1; abort();
config = config_get_config (); config = config_get_config ();
...@@ -67,6 +71,8 @@ int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser) ...@@ -67,6 +71,8 @@ int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser)
stats_event_args (NULL, "clients", "%d", global.clients); stats_event_args (NULL, "clients", "%d", global.clients);
client->con = con; client->con = con;
client->parser = parser; client->parser = parser;
client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
client->refbuf->len = 0; /* force reader code to ignore buffer contents */
client->pos = 0; client->pos = 0;
client->write_to_client = format_generic_write_to_client; client->write_to_client = format_generic_write_to_client;
*c_ptr = client; *c_ptr = client;
......
...@@ -434,9 +434,7 @@ void connection_accept_loop(void) ...@@ -434,9 +434,7 @@ void connection_accept_loop(void)
global_unlock(); global_unlock();
/* setup client for reading incoming http */ /* setup client for reading incoming http */
client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
client->refbuf->data [PER_CLIENT_REFBUF_SIZE-1] = '\000'; client->refbuf->data [PER_CLIENT_REFBUF_SIZE-1] = '\000';
client->refbuf->len = 0; /* force reader code to ignore buffer */
node = calloc (1, sizeof (client_queue_t)); node = calloc (1, sizeof (client_queue_t));
if (node == NULL) if (node == NULL)
...@@ -482,7 +480,7 @@ void connection_accept_loop(void) ...@@ -482,7 +480,7 @@ void connection_accept_loop(void)
/* Called when activating a source. Verifies that the source count is not /* Called when activating a source. Verifies that the source count is not
* exceeded and applies any initial parameters. * exceeded and applies any initial parameters.
*/ */
int connection_complete_source (source_t *source) int connection_complete_source (source_t *source, int response)
{ {
ice_config_t *config = config_get_config(); ice_config_t *config = config_get_config();
...@@ -505,8 +503,11 @@ int connection_complete_source (source_t *source) ...@@ -505,8 +503,11 @@ int connection_complete_source (source_t *source)
{ {
global_unlock(); global_unlock();
config_release_config(); config_release_config();
if (source->client) if (response)
{
client_send_404 (source->client, "Content-type not supported"); client_send_404 (source->client, "Content-type not supported");
source->client = NULL;
}
WARN1("Content-type \"%s\" not supported, dropping source", contenttype); WARN1("Content-type \"%s\" not supported, dropping source", contenttype);
return -1; return -1;
} }
...@@ -522,31 +523,15 @@ int connection_complete_source (source_t *source) ...@@ -522,31 +523,15 @@ int connection_complete_source (source_t *source)
{ {
global_unlock(); global_unlock();
config_release_config(); config_release_config();
if (source->client) if (response)
{
client_send_404 (source->client, "internal format allocation problem"); client_send_404 (source->client, "internal format allocation problem");
source->client = NULL;
}
WARN1 ("plugin format failed for \"%s\"", source->mount); WARN1 ("plugin format failed for \"%s\"", source->mount);
source->client = NULL;
return -1; return -1;
} }
/* for relays, we don't yet have a client, however we do require one
* to retrieve the stream from. This is created here, quite late,
* because we can't use this client to return an error code/message,
* so we only do this once we know we're going to accept the source.
*/
if (source->client == NULL)
{
if (client_create (&source->client, source->con, source->parser) < 0)
{
config_release_config();
global_unlock();
connection_close (source->con);
source->con = NULL;
httpp_destroy (source->parser);
source->parser = NULL;
return -1;
}
}
global.sources++; global.sources++;
stats_event_args (NULL, "sources", "%d", global.sources); stats_event_args (NULL, "sources", "%d", global.sources);
global_unlock(); global_unlock();
...@@ -569,8 +554,11 @@ int connection_complete_source (source_t *source) ...@@ -569,8 +554,11 @@ int connection_complete_source (source_t *source)
global_unlock(); global_unlock();
config_release_config(); config_release_config();
if (source->client) if (response)
{
client_send_404 (source->client, "too many sources connected"); client_send_404 (source->client, "too many sources connected");
source->client = NULL;
}
return -1; return -1;
} }
...@@ -758,9 +746,8 @@ static void _handle_source_request (client_t *client, char *uri, int auth_style) ...@@ -758,9 +746,8 @@ static void _handle_source_request (client_t *client, char *uri, int auth_style)
source->client = client; source->client = client;
source->parser = client->parser; source->parser = client->parser;
source->con = client->con; source->con = client->con;
if (connection_complete_source (source) < 0) if (connection_complete_source (source, 1) < 0)
{ {
source->client = NULL;
source_free_source (source); source_free_source (source);
} }
else else
......
...@@ -45,7 +45,7 @@ void connection_shutdown(void); ...@@ -45,7 +45,7 @@ void connection_shutdown(void);
void connection_accept_loop(void); void connection_accept_loop(void);
void connection_close(connection_t *con); void connection_close(connection_t *con);
connection_t *connection_create (sock_t sock, sock_t serversock, char *ip); connection_t *connection_create (sock_t sock, sock_t serversock, char *ip);
int connection_complete_source (struct source_tag *source); int connection_complete_source (struct source_tag *source, int response);
int connection_check_source_pass(http_parser_t *parser, const char *mount); int connection_check_source_pass(http_parser_t *parser, const char *mount);
int connection_check_relay_pass(http_parser_t *parser); int connection_check_relay_pass(http_parser_t *parser);
......
...@@ -150,7 +150,7 @@ int format_check_file_buffer (source_t *source, client_t *client) ...@@ -150,7 +150,7 @@ int format_check_file_buffer (source_t *source, client_t *client)
if (refbuf == NULL) if (refbuf == NULL)
{ {
/* client refers to no data, must be from a move */ /* client refers to no data, must be from a move */
if (source->client->con) if (source->client)
{ {
find_client_start (source, client); find_client_start (source, client);
return -1; return -1;
......
...@@ -237,7 +237,18 @@ static void *start_relay_stream (void *arg) ...@@ -237,7 +237,18 @@ static void *start_relay_stream (void *arg)
} }
src->parser = parser; src->parser = parser;
src->con = con; src->con = con;
if (connection_complete_source (src) < 0)
if (client_create (&src->client, con, parser) < 0)
{
/* make sure only the client_destory frees these */
con = NULL;
parser = NULL;
streamsock = SOCK_ERROR;
break;
}
client_set_queue (src->client, NULL);
if (connection_complete_source (src, 0) < 0)
{ {
DEBUG0("Failed to complete source initialisation"); DEBUG0("Failed to complete source initialisation");
break; break;
......
...@@ -442,7 +442,7 @@ static refbuf_t *get_next_buffer (source_t *source) ...@@ -442,7 +442,7 @@ static refbuf_t *get_next_buffer (source_t *source)
int fds = 0; int fds = 0;
time_t current = time (NULL); time_t current = time (NULL);
if (source->client->con) if (source->client)
fds = util_timed_wait_for_fd (source->con->sock, delay); fds = util_timed_wait_for_fd (source->con->sock, delay);
else else
{ {
...@@ -1132,7 +1132,7 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) ...@@ -1132,7 +1132,7 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo) void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo)
{ {
/* skip if source is a fallback to file */ /* skip if source is a fallback to file */
if (source->running && source->client->con == NULL) if (source->running && source->client == NULL)
return; return;
/* set global settings first */ /* set global settings first */
source->queue_size_limit = config->queue_size_limit; source->queue_size_limit = config->queue_size_limit;
...@@ -1312,7 +1312,7 @@ static void *source_fallback_file (void *arg) ...@@ -1312,7 +1312,7 @@ static void *source_fallback_file (void *arg)
source->parser = parser; source->parser = parser;
file = NULL; file = NULL;
if (connection_complete_source (source) < 0) if (connection_complete_source (source, 0) < 0)
break; break;
source_client_thread (source); source_client_thread (source);
} while (0); } while (0);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment