Commit 2a2938b6 authored by Karl Heyes's avatar Karl Heyes

merge from branch. push clients count handling to the client_create/_destroy

functions. call client_create in the general handler and pass client_t to the
specific handler including the stats request handler, which now logs in the
access log.

svn path=/icecast/trunk/icecast/; revision=9220
parent acc79b77
...@@ -27,8 +27,10 @@ ...@@ -27,8 +27,10 @@
#include "avl/avl.h" #include "avl/avl.h"
#include "httpp/httpp.h" #include "httpp/httpp.h"
#include "cfgfile.h"
#include "connection.h" #include "connection.h"
#include "refbuf.h" #include "refbuf.h"
#include "stats.h"
#include "client.h" #include "client.h"
#include "logging.h" #include "logging.h"
...@@ -38,7 +40,23 @@ ...@@ -38,7 +40,23 @@
client_t *client_create(connection_t *con, http_parser_t *parser) client_t *client_create(connection_t *con, http_parser_t *parser)
{ {
ice_config_t *config = config_get_config ();
client_t *client = (client_t *)calloc(1, sizeof(client_t)); client_t *client = (client_t *)calloc(1, sizeof(client_t));
int client_limit = config->client_limit;
config_release_config ();
global_lock();
if (global.clients >= client_limit || client == NULL)
{
client_limit = global.clients;
global_unlock();
free (client);
WARN1 ("server client limit reached (%d clients)", client_limit);
return NULL;
}
global.clients++;
stats_event_args (NULL, "clients", "%d", global.clients);
global_unlock();
client->con = con; client->con = con;
client->parser = parser; client->parser = parser;
...@@ -61,6 +79,11 @@ void client_destroy(client_t *client) ...@@ -61,6 +79,11 @@ void client_destroy(client_t *client)
connection_close(client->con); connection_close(client->con);
httpp_destroy(client->parser); httpp_destroy(client->parser);
global_lock ();
global.clients--;
stats_event_args (NULL, "clients", "%d", global.clients);
global_unlock ();
/* drop ref counts if need be */ /* drop ref counts if need be */
if (client->refbuf) if (client->refbuf)
refbuf_release (client->refbuf); refbuf_release (client->refbuf);
......
...@@ -500,7 +500,21 @@ int connection_complete_source (source_t *source) ...@@ -500,7 +500,21 @@ int connection_complete_source (source_t *source)
* so we only do this once we know we're going to accept the source. * so we only do this once we know we're going to accept the source.
*/ */
if (source->client == NULL) if (source->client == NULL)
{
source->client = client_create (source->con, source->parser); source->client = client_create (source->con, source->parser);
if (source->client == NULL)
{
config_release_config();
global_lock();
global.sources--;
global_unlock();
connection_close (source->con);
source->con = NULL;
httpp_destroy (source->parser);
source->parser = NULL;
return -1;
}
}
while (mountproxy) while (mountproxy)
{ {
...@@ -686,14 +700,10 @@ int connection_check_source_pass(http_parser_t *parser, char *mount) ...@@ -686,14 +700,10 @@ int connection_check_source_pass(http_parser_t *parser, char *mount)
} }
static void _handle_source_request(connection_t *con, static void _handle_source_request (client_t *client, char *uri, int auth_style)
http_parser_t *parser, char *uri, int auth_style)
{ {
client_t *client;
source_t *source; source_t *source;
client = client_create(con, parser);
INFO1("Source logging in at mountpoint \"%s\"", uri); INFO1("Source logging in at mountpoint \"%s\"", uri);
if (uri[0] != '/') if (uri[0] != '/')
...@@ -702,9 +712,9 @@ static void _handle_source_request(connection_t *con, ...@@ -702,9 +712,9 @@ static void _handle_source_request(connection_t *con,
client_send_401 (client); client_send_401 (client);
return; return;
} }
if (auth_style == ICECAST_SOURCE_AUTH) { if (auth_style == ICECAST_SOURCE_AUTH) {
if (!connection_check_source_pass(parser, uri)) { if (connection_check_source_pass (client->parser, uri) == 0)
{
/* We commonly get this if the source client is using the wrong /* We commonly get this if the source client is using the wrong
* protocol: attempt to diagnose this and return an error * protocol: attempt to diagnose this and return an error
*/ */
...@@ -721,8 +731,8 @@ static void _handle_source_request(connection_t *con, ...@@ -721,8 +731,8 @@ static void _handle_source_request(connection_t *con,
source->shoutcast_compat = 1; source->shoutcast_compat = 1;
} }
source->client = client; source->client = client;
source->parser = parser; source->parser = client->parser;
source->con = con; source->con = client->con;
if (connection_complete_source (source) < 0) if (connection_complete_source (source) < 0)
{ {
source->client = NULL; source->client = NULL;
...@@ -740,35 +750,25 @@ static void _handle_source_request(connection_t *con, ...@@ -740,35 +750,25 @@ static void _handle_source_request(connection_t *con,
} }
static void _handle_stats_request(connection_t *con, static void _handle_stats_request (client_t *client, char *uri)
http_parser_t *parser, char *uri)
{ {
stats_connection_t *stats;
stats_event_inc(NULL, "stats_connections"); stats_event_inc(NULL, "stats_connections");
if (!connection_check_admin_pass(parser)) { if (connection_check_admin_pass (client->parser) == 0)
{
client_send_401 (client);
ERROR0("Bad password for stats connection"); ERROR0("Bad password for stats connection");
connection_close(con);
httpp_destroy(parser);
return; return;
} }
stats_event_inc(NULL, "stats"); stats_event_inc(NULL, "stats");
/* create stats connection and create stats handler thread */ thread_create("Stats Connection", stats_connection, (void *)client, THREAD_DETACHED);
stats = (stats_connection_t *)malloc(sizeof(stats_connection_t));
stats->parser = parser;
stats->con = con;
thread_create("Stats Connection", stats_connection, (void *)stats, THREAD_DETACHED);
} }
static void _handle_get_request(connection_t *con, static void _handle_get_request (client_t *client, char *passed_uri)
http_parser_t *parser, char *passed_uri)
{ {
char *fullpath; char *fullpath;
client_t *client;
int bytes; int bytes;
struct stat statbuf; struct stat statbuf;
source_t *source; source_t *source;
...@@ -780,7 +780,6 @@ static void _handle_get_request(connection_t *con, ...@@ -780,7 +780,6 @@ static void _handle_get_request(connection_t *con,
int serverport = 0; int serverport = 0;
aliases *alias; aliases *alias;
ice_config_t *config; ice_config_t *config;
int client_limit;
int ret; int ret;
char *uri = passed_uri; char *uri = passed_uri;
...@@ -790,14 +789,13 @@ static void _handle_get_request(connection_t *con, ...@@ -790,14 +789,13 @@ static void _handle_get_request(connection_t *con,
host = strdup (config->hostname); host = strdup (config->hostname);
port = config->port; port = config->port;
for(i = 0; i < global.server_sockets; i++) { for(i = 0; i < global.server_sockets; i++) {
if(global.serversock[i] == con->serversock) { if(global.serversock[i] == client->con->serversock) {
serverhost = config->listeners[i].bind_address; serverhost = config->listeners[i].bind_address;
serverport = config->listeners[i].port; serverport = config->listeners[i].port;
break; break;
} }
} }
alias = config->aliases; alias = config->aliases;
client_limit = config->client_limit;
/* there are several types of HTTP GET clients /* there are several types of HTTP GET clients
** media clients, which are looking for a source (eg, URI = /stream.ogg) ** media clients, which are looking for a source (eg, URI = /stream.ogg)
...@@ -820,8 +818,6 @@ static void _handle_get_request(connection_t *con, ...@@ -820,8 +818,6 @@ static void _handle_get_request(connection_t *con,
} }
config_release_config(); config_release_config();
/* make a client */
client = client_create(con, parser);
stats_event_inc(NULL, "client_connections"); stats_event_inc(NULL, "client_connections");
/* Dispatch all admin requests */ /* Dispatch all admin requests */
...@@ -894,16 +890,6 @@ static void _handle_get_request(connection_t *con, ...@@ -894,16 +890,6 @@ static void _handle_get_request(connection_t *con,
} }
free (host); free (host);
global_lock();
if (global.clients >= client_limit) {
global_unlock();
client_send_404(client,
"The server is already full. Try again later.");
if (uri != passed_uri) free (uri);
return;
}
global_unlock();
avl_tree_rlock(global.source_tree); avl_tree_rlock(global.source_tree);
source = source_find_mount(uri); source = source_find_mount(uri);
if (source) { if (source) {
...@@ -949,21 +935,12 @@ static void _handle_get_request(connection_t *con, ...@@ -949,21 +935,12 @@ static void _handle_get_request(connection_t *con,
} }
} }
/* And then check that there's actually room in the server... */
global_lock(); global_lock();
if (global.clients >= client_limit) {
global_unlock();
avl_tree_unlock(global.source_tree);
client_send_404(client,
"The server is already full. Try again later.");
if (uri != passed_uri) free (uri);
return;
}
/* Early-out for per-source max listeners. This gets checked again /* Early-out for per-source max listeners. This gets checked again
* by the source itself, later. This route gives a useful message to * by the source itself, later. This route gives a useful message to
* the client, also. * the client, also.
*/ */
else if(source->max_listeners != -1 && if (source->max_listeners != -1 &&
source->listeners >= source->max_listeners) source->listeners >= source->max_listeners)
{ {
global_unlock(); global_unlock();
...@@ -973,7 +950,6 @@ static void _handle_get_request(connection_t *con, ...@@ -973,7 +950,6 @@ static void _handle_get_request(connection_t *con,
if (uri != passed_uri) free (uri); if (uri != passed_uri) free (uri);
return; return;
} }
global.clients++;
global_unlock(); global_unlock();
source->format->create_client_data (source, client); source->format->create_client_data (source, client);
...@@ -1048,19 +1024,19 @@ void _handle_shoutcast_compatible(connection_t *con, char *mount, char *source_p ...@@ -1048,19 +1024,19 @@ void _handle_shoutcast_compatible(connection_t *con, char *mount, char *source_p
"SOURCE %s HTTP/1.0\r\n%s", mount, header); "SOURCE %s HTTP/1.0\r\n%s", mount, header);
parser = httpp_create_parser(); parser = httpp_create_parser();
httpp_initialize(parser, NULL); httpp_initialize(parser, NULL);
if (httpp_parse(parser, http_compliant, strlen(http_compliant))) { if (httpp_parse (parser, http_compliant, strlen(http_compliant)))
_handle_source_request(con, parser, mount, SHOUTCAST_SOURCE_AUTH); {
free(http_compliant); client_t *client = client_create (con, parser);
return; if (client)
} {
else { _handle_source_request (client, mount, SHOUTCAST_SOURCE_AUTH);
ERROR0("Invalid source request"); free (http_compliant);
connection_close(con); return;
free(http_compliant); }
httpp_destroy(parser);
return;
} }
return; connection_close (con);
httpp_destroy (parser);
free (http_compliant);
} }
static void *_handle_connection(void *arg) static void *_handle_connection(void *arg)
...@@ -1145,25 +1121,36 @@ static void *_handle_connection(void *arg) ...@@ -1145,25 +1121,36 @@ static void *_handle_connection(void *arg)
rawuri = httpp_getvar(parser, HTTPP_VAR_URI); rawuri = httpp_getvar(parser, HTTPP_VAR_URI);
uri = util_normalise_uri(rawuri); uri = util_normalise_uri(rawuri);
if(!uri) { if (uri == NULL)
client = client_create(con, parser); {
client_send_404(client, "The path you requested was invalid"); sock_write(con->sock, "The path you requested was invalid\r\n");
connection_close(con);
httpp_destroy(parser);
continue;
}
client = client_create (con, parser);
if (client == NULL)
{
sock_write (con->sock, "HTTP/1.0 404 File Not Found\r\n"
"Content-Type: text/html\r\n\r\n"
"<b>Connection limit reached</b>");
connection_close(con);
httpp_destroy(parser);
continue; continue;
} }
if (parser->req_type == httpp_req_source) { if (parser->req_type == httpp_req_source) {
_handle_source_request(con, parser, uri, ICECAST_SOURCE_AUTH); _handle_source_request (client, uri, ICECAST_SOURCE_AUTH);
} }
else if (parser->req_type == httpp_req_stats) { else if (parser->req_type == httpp_req_stats) {
_handle_stats_request(con, parser, uri); _handle_stats_request (client, uri);
} }
else if (parser->req_type == httpp_req_get) { else if (parser->req_type == httpp_req_get) {
_handle_get_request(con, parser, uri); _handle_get_request (client, uri);
} }
else { else {
ERROR0("Wrong request type from client"); ERROR0("Wrong request type from client");
connection_close(con); client_send_400 (client, "unknown request");
httpp_destroy(parser);
} }
free(uri); free(uri);
......
...@@ -224,7 +224,6 @@ static void wait_for_fds() { ...@@ -224,7 +224,6 @@ static void wait_for_fds() {
active_list = to_move; active_list = to_move;
client_tree_changed = 1; client_tree_changed = 1;
fserve_clients++; fserve_clients++;
stats_event_inc(NULL, "clients");
} }
pending_list = NULL; pending_list = NULL;
thread_mutex_unlock (&pending_lock); thread_mutex_unlock (&pending_lock);
...@@ -303,10 +302,6 @@ static void *fserv_thread_function(void *arg) ...@@ -303,10 +302,6 @@ static void *fserv_thread_function(void *arg)
fserve_t *to_go = (fserve_t *)pending_list; fserve_t *to_go = (fserve_t *)pending_list;
pending_list = to_go->next; pending_list = to_go->next;
/* Argh! _free_client decrements "clients" in stats, but it hasn't been
incremented if the client is still on the pending list. So, fix that
up first. Messy. */
stats_event_inc(NULL, "clients");
_free_client (to_go); _free_client (to_go);
} }
thread_mutex_unlock (&pending_lock); thread_mutex_unlock (&pending_lock);
...@@ -395,21 +390,6 @@ int fserve_client_create(client_t *httpclient, char *path) ...@@ -395,21 +390,6 @@ int fserve_client_create(client_t *httpclient, char *path)
client->content_length = (int64_t)file_buf.st_size; client->content_length = (int64_t)file_buf.st_size;
} }
global_lock();
if(global.clients >= client_limit) {
global_unlock();
httpclient->respcode = 504;
bytes = sock_write(httpclient->con->sock,
"HTTP/1.0 504 Server Full\r\n"
"Content-Type: text/html\r\n\r\n"
"<b>Server is full, try again later.</b>\r\n");
if(bytes > 0) httpclient->con->sent_bytes = bytes;
fserve_client_destroy(client);
return -1;
}
global.clients++;
global_unlock();
range = httpp_getvar (client->client->parser, "range"); range = httpp_getvar (client->client->parser, "range");
if (range != NULL) { if (range != NULL) {
...@@ -510,11 +490,6 @@ static int _free_client(void *key) ...@@ -510,11 +490,6 @@ static int _free_client(void *key)
fserve_t *client = (fserve_t *)key; fserve_t *client = (fserve_t *)key;
fserve_client_destroy(client); fserve_client_destroy(client);
global_lock();
global.clients--;
global_unlock();
stats_event_dec(NULL, "clients");
return 1; return 1;
} }
......
...@@ -219,9 +219,6 @@ void source_clear_source (source_t *source) ...@@ -219,9 +219,6 @@ void source_clear_source (source_t *source)
avl_tree_rlock (source->pending_tree); avl_tree_rlock (source->pending_tree);
while (avl_get_first (source->pending_tree)) while (avl_get_first (source->pending_tree))
{ {
/* _free_client decrements client count, so increment it first... */
stats_event_inc(NULL, "clients");
avl_delete (source->pending_tree, avl_delete (source->pending_tree,
avl_get_first(source->pending_tree)->key, _free_client); avl_get_first(source->pending_tree)->key, _free_client);
} }
...@@ -697,9 +694,6 @@ void source_main (source_t *source) ...@@ -697,9 +694,6 @@ void source_main (source_t *source)
client_node = avl_get_first(source->pending_tree); client_node = avl_get_first(source->pending_tree);
while (client_node) { while (client_node) {
/* We have to do this first, since _free_client decrements it... */
stats_event_inc(NULL, "clients");
if(source->max_listeners != -1 && if(source->max_listeners != -1 &&
source->listeners >= source->max_listeners) source->listeners >= source->max_listeners)
{ {
...@@ -836,11 +830,6 @@ static int _free_client(void *key) ...@@ -836,11 +830,6 @@ static int _free_client(void *key)
{ {
client_t *client = (client_t *)key; client_t *client = (client_t *)key;
global_lock();
global.clients--;
global_unlock();
stats_event_dec(NULL, "clients");
client_destroy(client); client_destroy(client);
return 1; return 1;
......
...@@ -679,15 +679,18 @@ static stats_event_t *_get_event_from_queue(stats_event_t **queue) ...@@ -679,15 +679,18 @@ static stats_event_t *_get_event_from_queue(stats_event_t **queue)
return event; return event;
} }
static int _send_event_to_client(stats_event_t *event, connection_t *con) static int _send_event_to_client(stats_event_t *event, client_t *client)
{ {
int ret; int ret = -1, len;
char buf [200];
/* send data to the client!!!! */ /* send data to the client!!!! */
ret = sock_write(con->sock, "EVENT %s %s %s\n", len = snprintf (buf, sizeof (buf), "EVENT %s %s %s\n",
(event->source != NULL) ? event->source : "global", (event->source != NULL) ? event->source : "global",
event->name ? event->name : "null", event->name ? event->name : "null",
event->value ? event->value : "null"); event->value ? event->value : "null");
if (len > 0 && len < sizeof (buf))
ret = client_send_bytes (client, buf, len);
return (ret == -1) ? 0 : 1; return (ret == -1) ? 0 : 1;
} }
...@@ -775,7 +778,7 @@ static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex) ...@@ -775,7 +778,7 @@ static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex)
void *stats_connection(void *arg) void *stats_connection(void *arg)
{ {
stats_connection_t *statcon = (stats_connection_t *)arg; client_t *client = (client_t *)arg;
stats_event_t *local_event_queue = NULL; stats_event_t *local_event_queue = NULL;
mutex_t local_event_mutex; mutex_t local_event_mutex;
stats_event_t *event; stats_event_t *event;
...@@ -785,6 +788,7 @@ void *stats_connection(void *arg) ...@@ -785,6 +788,7 @@ void *stats_connection(void *arg)
/* increment the thread count */ /* increment the thread count */
thread_mutex_lock(&_stats_mutex); thread_mutex_lock(&_stats_mutex);
_stats_threads++; _stats_threads++;
stats_event_args (NULL, "stats", "%d", _stats_threads);
thread_mutex_unlock(&_stats_mutex); thread_mutex_unlock(&_stats_mutex);
thread_mutex_create(&local_event_mutex); thread_mutex_create(&local_event_mutex);
...@@ -795,7 +799,7 @@ void *stats_connection(void *arg) ...@@ -795,7 +799,7 @@ void *stats_connection(void *arg)
thread_mutex_lock(&local_event_mutex); thread_mutex_lock(&local_event_mutex);
event = _get_event_from_queue(&local_event_queue); event = _get_event_from_queue(&local_event_queue);
if (event != NULL) { if (event != NULL) {
if (!_send_event_to_client(event, statcon->con)) { if (!_send_event_to_client(event, client)) {
_free_event(event); _free_event(event);
thread_mutex_unlock(&local_event_mutex); thread_mutex_unlock(&local_event_mutex);
break; break;
...@@ -813,9 +817,11 @@ void *stats_connection(void *arg) ...@@ -813,9 +817,11 @@ void *stats_connection(void *arg)
thread_mutex_lock(&_stats_mutex); thread_mutex_lock(&_stats_mutex);
_unregister_listener (&local_event_queue); _unregister_listener (&local_event_queue);
_stats_threads--; _stats_threads--;
stats_event_args (NULL, "stats", "%d", _stats_threads);
thread_mutex_unlock(&_stats_mutex); thread_mutex_unlock(&_stats_mutex);
thread_mutex_destroy(&local_event_mutex); thread_mutex_destroy(&local_event_mutex);
client_destroy (client);
INFO0 ("stats client finished"); INFO0 ("stats client finished");
return NULL; return NULL;
......
...@@ -21,12 +21,6 @@ ...@@ -21,12 +21,6 @@
#include <libxml/tree.h> #include <libxml/tree.h>
typedef struct _stats_connection_tag
{
connection_t *con;
http_parser_t *parser;
} stats_connection_t;
typedef struct _stats_node_tag