Commit 80561957 authored by Karl Heyes's avatar Karl Heyes

fix a busy CPU case when slow and fast file serving clients are connected at

the same time.  Flag clients on return from select/poll and only process those.
Also fix a rare race which could leave clients in pending

svn path=/icecast/trunk/icecast/; revision=8070
parent 17537d24
...@@ -60,22 +60,22 @@ ...@@ -60,22 +60,22 @@
#define MIMETYPESFILE "/etc/mime.types" #define MIMETYPESFILE "/etc/mime.types"
#endif #endif
static avl_tree *client_tree; static fserve_t *active_list = NULL;
static avl_tree *pending_tree; static volatile fserve_t *pending_list = NULL;
static mutex_t pending_lock;
static avl_tree *mimetypes = NULL; static avl_tree *mimetypes = NULL;
static cond_t fserv_cond;
static thread_type *fserv_thread; static thread_type *fserv_thread;
static int run_fserv; static int run_fserv = 0;
static int fserve_clients; static unsigned int fserve_clients;
static int client_tree_changed=0; static int client_tree_changed=0;
#ifdef HAVE_POLL #ifdef HAVE_POLL
static struct pollfd *ufds = NULL; static struct pollfd *ufds = NULL;
static int ufdssize = 0;
#else #else
static fd_set fds; static fd_set fds;
static int fd_max = 0; static int fd_max = -1;
#endif #endif
typedef struct { typedef struct {
...@@ -83,9 +83,6 @@ typedef struct { ...@@ -83,9 +83,6 @@ typedef struct {
char *type; char *type;
} mime_type; } mime_type;
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _remove_client(void *key);
static int _free_client(void *key); static int _free_client(void *key);
static int _delete_mapping(void *mapping); static int _delete_mapping(void *mapping);
static void *fserv_thread_function(void *arg); static void *fserv_thread_function(void *arg);
...@@ -103,9 +100,7 @@ void fserve_initialize(void) ...@@ -103,9 +100,7 @@ void fserve_initialize(void)
create_mime_mappings(MIMETYPESFILE); create_mime_mappings(MIMETYPESFILE);
client_tree = avl_tree_new(_compare_clients, NULL); thread_mutex_create (&pending_lock);
pending_tree = avl_tree_new(_compare_clients, NULL);
thread_cond_create(&fserv_cond);
run_fserv = 1; run_fserv = 1;
...@@ -115,207 +110,205 @@ void fserve_initialize(void) ...@@ -115,207 +110,205 @@ void fserve_initialize(void)
void fserve_shutdown(void) void fserve_shutdown(void)
{ {
ice_config_t *config = config_get_config();
int serve = config->fileserve;
config_release_config();
if(!serve)
return;
if(!run_fserv) if(!run_fserv)
return; return;
avl_tree_free(mimetypes, _delete_mapping);
run_fserv = 0; run_fserv = 0;
thread_cond_signal(&fserv_cond);
thread_join(fserv_thread); thread_join(fserv_thread);
INFO0("file serving thread stopped");
thread_cond_destroy(&fserv_cond); avl_tree_free(mimetypes, _delete_mapping);
avl_tree_free(client_tree, _free_client);
avl_tree_free(pending_tree, _free_client);
} }
static void wait_for_fds() {
avl_node *client_node;
fserve_t *client;
int i;
while(run_fserv) {
#ifdef HAVE_POLL #ifdef HAVE_POLL
if(client_tree_changed) { int fserve_client_waiting (void)
client_tree_changed = 0; {
i = 0; fserve_t *fclient;
ufdssize = fserve_clients; unsigned int i = 0;
ufds = realloc(ufds, ufdssize * sizeof(struct pollfd));
avl_tree_rlock(client_tree);
client_node = avl_get_first(client_tree);
while(client_node) {
client = client_node->key;
ufds[i].fd = client->client->con->sock;
ufds[i].events = POLLOUT;
client_node = avl_get_next(client_node);
}
avl_tree_unlock(client_tree);
}
if(poll(ufds, ufdssize, 200) > 0) /* only rebuild ufds if there are clients added/removed */
return; if (client_tree_changed)
{
client_tree_changed = 0;
ufds = realloc(ufds, fserve_clients * sizeof(struct pollfd));
fclient = active_list;
while (fclient)
{
ufds[i].fd = fclient->client->con->sock;
ufds[i].events = POLLOUT;
ufds[i].revents = 0;
fclient = fclient->next;
i++;
}
}
if (poll(ufds, fserve_clients, 200) > 0)
{
/* mark any clients that are ready */
fclient = active_list;
for (i=0; i<fserve_clients; i++)
{
if (ufds[i].revents & (POLLOUT|POLLHUP|POLLERR))
fclient->ready = 1;
fclient = fclient->next;
}
return 1;
}
return 0;
}
#else #else
int fserve_client_waiting (void)
{
fserve_t *fclient;
fd_set realfds;
/* only rebuild fds if there are clients added/removed */
if(client_tree_changed) {
client_tree_changed = 0;
FD_ZERO(&fds);
fd_max = -1;
fclient = active_list;
while (fclient) {
FD_SET (fclient->client->con->sock, &fds);
if (fclient->client->con->sock > fd_max)
fd_max = fclient->client->con->sock;
fclient = fclient->next;
}
}
/* hack for windows, select needs at least 1 descriptor */
if (fd_max == -1)
thread_sleep (200000);
else
{
struct timeval tv; struct timeval tv;
fd_set realfds;
tv.tv_sec = 0; tv.tv_sec = 0;
tv.tv_usec = 200000; tv.tv_usec = 200000;
if(client_tree_changed) { /* make a duplicate of the set so we do not have to rebuild it
client_tree_changed = 0; * each time around */
i=0;
FD_ZERO(&fds);
fd_max = 0;
avl_tree_rlock(client_tree);
client_node = avl_get_first(client_tree);
while(client_node) {
client = client_node->key;
FD_SET(client->client->con->sock, &fds);
if(client->client->con->sock > fd_max)
fd_max = client->client->con->sock;
client_node = avl_get_next(client_node);
}
avl_tree_unlock(client_tree);
}
memcpy(&realfds, &fds, sizeof(fd_set)); memcpy(&realfds, &fds, sizeof(fd_set));
if(select(fd_max+1, NULL, &realfds, NULL, &tv) > 0) if(select(fd_max+1, NULL, &realfds, NULL, &tv) > 0)
return; {
/* mark any clients that are ready */
fclient = active_list;
while (fclient)
{
if (FD_ISSET (fclient->client->con->sock, &realfds))
fclient->ready = 1;
fclient = fclient->next;
}
return 1;
}
}
return 0;
}
#endif #endif
else {
avl_tree_rlock(pending_tree); static void wait_for_fds() {
client_node = avl_get_first(pending_tree); fserve_t *fclient;
avl_tree_unlock(pending_tree);
if(client_node) while (run_fserv)
return; {
/* add any new clients here */
if (pending_list)
{
thread_mutex_lock (&pending_lock);
fclient = (fserve_t*)pending_list;
while (fclient)
{
fserve_t *to_move = fclient;
fclient = fclient->next;
to_move->next = active_list;
active_list = to_move;
client_tree_changed = 1;
fserve_clients++;
stats_event_inc(NULL, "clients");
}
pending_list = NULL;
thread_mutex_unlock (&pending_lock);
} }
/* drop out of here is someone is ready */
if (fserve_client_waiting())
break;
} }
} }
static void *fserv_thread_function(void *arg) static void *fserv_thread_function(void *arg)
{ {
avl_node *client_node, *pending_node; fserve_t *fclient, **trail;
fserve_t *client;
int sbytes, bytes; int sbytes, bytes;
INFO0("file serving thread started");
while (run_fserv) { while (run_fserv) {
avl_tree_rlock(client_tree);
client_node = avl_get_first(client_tree);
if(!client_node) {
avl_tree_rlock(pending_tree);
pending_node = avl_get_first(pending_tree);
if(!pending_node) {
/* There are no current clients. Wait until there are... */
avl_tree_unlock(pending_tree);
avl_tree_unlock(client_tree);
thread_cond_wait(&fserv_cond);
continue;
}
avl_tree_unlock(pending_tree);
}
/* This isn't hugely efficient, but it'll do for now */
avl_tree_unlock(client_tree);
wait_for_fds(); wait_for_fds();
avl_tree_rlock(client_tree); fclient = active_list;
client_node = avl_get_first(client_tree); trail = &active_list;
while(client_node) { while (fclient)
avl_node_wlock(client_node); {
/* process this client, if it is ready */
client = (fserve_t *)client_node->key; if (fclient->ready)
{
if(client->offset >= client->datasize) { fclient->ready = 0;
/* Grab a new chunk */ if(fclient->offset >= fclient->datasize) {
bytes = fread(client->buf, 1, BUFSIZE, client->file); /* Grab a new chunk */
if(bytes <= 0) { bytes = fread(fclient->buf, 1, BUFSIZE, fclient->file);
client->client->con->error = 1; if (bytes == 0)
avl_node_unlock(client_node); {
client_node = avl_get_next(client_node); fserve_t *to_go = fclient;
continue; fclient = fclient->next;
*trail = fclient;
_free_client (to_go);
fserve_clients--;
client_tree_changed = 1;
continue;
}
fclient->offset = 0;
fclient->datasize = bytes;
} }
client->offset = 0;
client->datasize = bytes;
}
/* Now try and send current chunk. */ /* Now try and send current chunk. */
sbytes = client_send_bytes (client->client, sbytes = client_send_bytes (fclient->client,
&client->buf[client->offset], &fclient->buf[fclient->offset],
client->datasize - client->offset); fclient->datasize - fclient->offset);
/* TODO: remove clients if they take too long. */ /* TODO: remove clients if they take too long. */
if(sbytes >= 0) { if(sbytes > 0) {
client->offset += sbytes; fclient->offset += sbytes;
} }
avl_node_unlock(client_node);
client_node = avl_get_next(client_node);
}
avl_tree_unlock(client_tree);
/* Now we need a write lock instead, to delete done clients. */
avl_tree_wlock(client_tree);
client_node = avl_get_first(client_tree); if (fclient->client->con->error)
while(client_node) { {
client = (fserve_t *)client_node->key; fserve_t *to_go = fclient;
if(client->client->con->error) { fclient = fclient->next;
fserve_clients--; *trail = fclient;
client_node = avl_get_next(client_node); fserve_clients--;
avl_delete(client_tree, (void *)client, _free_client); _free_client (to_go);
client_tree_changed = 1; client_tree_changed = 1;
continue; continue;
}
} }
client_node = avl_get_next(client_node); trail = &fclient->next;
fclient = fclient->next;
} }
avl_tree_wlock(pending_tree);
/* And now insert new clients. */
client_node = avl_get_first(pending_tree);
while(client_node) {
client = (fserve_t *)client_node->key;
avl_insert(client_tree, client);
client_tree_changed = 1;
fserve_clients++;
stats_event_inc(NULL, "clients");
client_node = avl_get_next(client_node);
}
/* clear pending */
while(avl_get_first(pending_tree)) {
avl_delete(pending_tree, avl_get_first(pending_tree)->key,
_remove_client);
}
avl_tree_unlock(pending_tree);
avl_tree_unlock(client_tree);
} }
/* Shutdown path */ /* Shutdown path */
thread_mutex_lock (&pending_lock);
while (pending_list)
{
fserve_t *to_go = (fserve_t *)pending_list;
pending_list = to_go->next;
_free_client (to_go);
}
thread_mutex_unlock (&pending_lock);
avl_tree_wlock(pending_tree); while (active_list)
while(avl_get_first(pending_tree)) {
avl_delete(pending_tree, avl_get_first(pending_tree)->key, fserve_t *to_go = active_list;
_free_client); active_list = to_go->next;
avl_tree_unlock(pending_tree); _free_client (to_go);
}
avl_tree_wlock(client_tree);
while(avl_get_first(client_tree))
avl_delete(client_tree, avl_get_first(client_tree)->key,
_free_client);
avl_tree_unlock(client_tree);
thread_exit(0);
return NULL; return NULL;
} }
...@@ -378,6 +371,7 @@ int fserve_client_create(client_t *httpclient, char *path) ...@@ -378,6 +371,7 @@ int fserve_client_create(client_t *httpclient, char *path)
client->client = httpclient; client->client = httpclient;
client->offset = 0; client->offset = 0;
client->datasize = 0; client->datasize = 0;
client->ready = 0;
client->buf = malloc(BUFSIZE); client->buf = malloc(BUFSIZE);
global_lock(); global_lock();
...@@ -405,34 +399,14 @@ int fserve_client_create(client_t *httpclient, char *path) ...@@ -405,34 +399,14 @@ int fserve_client_create(client_t *httpclient, char *path)
sock_set_blocking(client->client->con->sock, SOCK_NONBLOCK); sock_set_blocking(client->client->con->sock, SOCK_NONBLOCK);
sock_set_nodelay(client->client->con->sock); sock_set_nodelay(client->client->con->sock);
avl_tree_wlock(pending_tree); thread_mutex_lock (&pending_lock);
avl_insert(pending_tree, client); client->next = (fserve_t *)pending_list;
avl_tree_unlock(pending_tree); pending_list = client;
thread_mutex_unlock (&pending_lock);
thread_cond_signal(&fserv_cond);
return 0; return 0;
} }
static int _compare_clients(void *compare_arg, void *a, void *b)
{
fserve_t *clienta = (fserve_t *)a;
fserve_t *clientb = (fserve_t *)b;
connection_t *cona = clienta->client->con;
connection_t *conb = clientb->client->con;
if (cona->id < conb->id) return -1;
if (cona->id > conb->id) return 1;
return 0;
}
static int _remove_client(void *key)
{
return 1;
}
static int _free_client(void *key) static int _free_client(void *key)
{ {
fserve_t *client = (fserve_t *)key; fserve_t *client = (fserve_t *)key;
......
...@@ -15,14 +15,16 @@ ...@@ -15,14 +15,16 @@
#include <stdio.h> #include <stdio.h>
typedef struct typedef struct _fserve_t
{ {
client_t *client; client_t *client;
FILE *file; FILE *file;
int offset; int offset;
int datasize; int datasize;
int ready;
unsigned char *buf; unsigned char *buf;
struct _fserve_t *next;
} fserve_t; } fserve_t;
void fserve_initialize(void); void fserve_initialize(void);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment