Commit f7afa976 authored by Karl Heyes's avatar Karl Heyes

update queue handling for stats. This was slow when many stats were being

queued. These apply to both web interface requests and stats clients.

svn path=/icecast/trunk/icecast/; revision=10370
parent 28000616
......@@ -51,10 +51,18 @@
#define STATS_EVENT_REMOVE 4
#define STATS_EVENT_HIDDEN 5
typedef struct _event_queue_tag
{
volatile stats_event_t *head;
volatile stats_event_t **tail;
} event_queue_t;
#define event_queue_init(qp) { (qp)->head = NULL; (qp)->tail = &(qp)->head; }
typedef struct _event_listener_tag
{
stats_event_t **queue;
mutex_t *mutex;
event_queue_t queue;
mutex_t mutex;
struct _event_listener_tag *next;
} event_listener_t;
......@@ -66,7 +74,7 @@ static volatile int _stats_threads = 0;
static stats_t _stats;
static mutex_t _stats_mutex;
static volatile stats_event_t *_global_event_queue;
static event_queue_t _global_event_queue;
mutex_t _global_event_mutex;
static volatile event_listener_t *_event_listeners;
......@@ -77,10 +85,11 @@ static int _compare_stats(void *a, void *b, void *arg);
static int _compare_source_stats(void *a, void *b, void *arg);
static int _free_stats(void *key);
static int _free_source_stats(void *key);
static void _add_event_to_queue(stats_event_t *event, stats_event_t **queue);
static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue);
static stats_node_t *_find_node(avl_tree *tree, char *name);
static stats_source_t *_find_source(avl_tree *tree, char *source);
static void _free_event(stats_event_t *event);
static stats_event_t *_get_event_from_queue (event_queue_t *queue);
/* simple helper function for creating an event */
......@@ -106,19 +115,7 @@ static stats_event_t *build_event (const char *source, const char *name, const c
static void queue_global_event (stats_event_t *event)
{
thread_mutex_lock(&_global_event_mutex);
if (_global_event_queue == NULL)
{
_global_event_queue = event;
}
else
{
stats_event_t *node = (stats_event_t *)_global_event_queue;
while (node->next)
node = node->next;
node->next = event;
}
/* DEBUG3("event added (%s, %s, %s)", event->source,
event->name, event->value); */
_add_event_to_queue (event, &_global_event_queue);
thread_mutex_unlock(&_global_event_mutex);
}
......@@ -134,7 +131,7 @@ void stats_initialize()
thread_mutex_create(&_stats_mutex);
/* set up stats queues */
_global_event_queue = NULL;
event_queue_init (&_global_event_queue);
thread_mutex_create(&_global_event_mutex);
/* fire off the stats thread */
......@@ -145,7 +142,6 @@ void stats_initialize()
void stats_shutdown()
{
int n;
stats_event_t *event, *next;
if(!_stats_running) /* We can't shutdown if we're not running. */
return;
......@@ -172,17 +168,17 @@ void stats_shutdown()
avl_tree_free(_stats.source_tree, _free_source_stats);
avl_tree_free(_stats.global_tree, _free_stats);
event = (stats_event_t *)_global_event_queue;
while(event) {
while (1)
{
stats_event_t *event = _get_event_from_queue (&_global_event_queue);
if (event == NULL) break;
if(event->source)
free(event->source);
if(event->value)
free(event->value);
if(event->name)
free(event->name);
next = event->next;
free(event);
event = next;
}
}
......@@ -537,7 +533,7 @@ static void process_source_event (stats_event_t *event)
void stats_event_time (const char *mount, const char *name)
{
time_t now = time(NULL);
struct tm local;
struct tm local;
char buffer[100];
localtime_r (&now, &local);
......@@ -571,11 +567,10 @@ static void *_stats_thread(void *arg)
INFO0 ("stats thread started");
while (_stats_running) {
if (_global_event_queue != NULL) {
if (_global_event_queue.head != NULL) {
/* grab the next event from the queue */
thread_mutex_lock(&_global_event_mutex);
event = (stats_event_t *)_global_event_queue;
_global_event_queue = event->next;
event = _get_event_from_queue (&_global_event_queue);
thread_mutex_unlock(&_global_event_mutex);
event->next = NULL;
......@@ -593,9 +588,9 @@ static void *_stats_thread(void *arg)
listener = (event_listener_t *)_event_listeners;
while (listener) {
copy = _copy_event(event);
thread_mutex_lock(listener->mutex);
_add_event_to_queue(copy, listener->queue);
thread_mutex_unlock(listener->mutex);
thread_mutex_lock (&listener->mutex);
_add_event_to_queue (copy, &listener->queue);
thread_mutex_unlock (&listener->mutex);
listener = listener->next;
}
......@@ -614,16 +609,15 @@ static void *_stats_thread(void *arg)
}
/* you must have the _stats_mutex locked here */
static void _unregister_listener(stats_event_t **queue)
static void _unregister_listener(event_listener_t *listener)
{
event_listener_t **prev = (event_listener_t **)&_event_listeners,
*current = *prev;
while (current)
{
if (current->queue == queue)
if (current == listener)
{
*prev = current->next;
free (current);
break;
}
prev = &current->next;
......@@ -632,25 +626,6 @@ static void _unregister_listener(stats_event_t **queue)
}
/* you must have the _stats_mutex locked here */
static void _register_listener(stats_event_t **queue, mutex_t *mutex)
{
event_listener_t *node;
event_listener_t *evli = (event_listener_t *)malloc(sizeof(event_listener_t));
evli->queue = queue;
evli->mutex = mutex;
evli->next = NULL;
if (_event_listeners == NULL) {
_event_listeners = evli;
} else {
node = (event_listener_t *)_event_listeners;
while (node->next) node = node->next;
node->next = evli;
}
}
static stats_event_t *_make_event_from_node(stats_node_t *node, char *source)
{
stats_event_t *event = (stats_event_t *)malloc(sizeof(stats_event_t));
......@@ -668,35 +643,32 @@ static stats_event_t *_make_event_from_node(stats_node_t *node, char *source)
return event;
}
static void _add_event_to_queue(stats_event_t *event, stats_event_t **queue)
{
stats_event_t *node;
if (*queue == NULL) {
*queue = event;
} else {
node = *queue;
while (node->next) node = node->next;
node->next = event;
}
static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue)
{
*queue->tail = event;
queue->tail = (volatile stats_event_t **)&event->next;
}
static stats_event_t *_get_event_from_queue(stats_event_t **queue)
{
stats_event_t *event;
if (*queue == NULL) return NULL;
static stats_event_t *_get_event_from_queue (event_queue_t *queue)
{
stats_event_t *event = NULL;
event = *queue;
*queue = (*queue)->next;
event->next = NULL;
if (queue && queue->head)
{
event = (stats_event_t *)queue->head;
queue->head = event->next;
if (queue->head == NULL)
queue->tail = &queue->head;
}
return event;
}
static int _send_event_to_client(stats_event_t *event, client_t *client)
{
int ret = -1, len;
int len;
char buf [200];
/* send data to the client!!!! */
......@@ -705,12 +677,15 @@ static int _send_event_to_client(stats_event_t *event, client_t *client)
event->name ? event->name : "null",
event->value ? event->value : "null");
if (len > 0 && len < (int)sizeof (buf))
ret = client_send_bytes (client, buf, len);
return (ret == -1) ? 0 : 1;
{
client_send_bytes (client, buf, len);
if (client->con->error)
return -1;
}
return 0;
}
void _dump_stats_to_queue(stats_event_t **queue)
void _dump_stats_to_queue (event_queue_t *queue)
{
avl_node *node;
avl_node *node2;
......@@ -750,7 +725,7 @@ void _dump_stats_to_queue(stats_event_t **queue)
** the queue for all new events atomically.
** note: mutex must already be created!
*/
static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex)
static void _register_listener (event_listener_t *listener)
{
avl_node *node;
avl_node *node2;
......@@ -765,7 +740,7 @@ static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex)
node = avl_get_first(_stats.global_tree);
while (node) {
event = _make_event_from_node((stats_node_t *)node->key, NULL);
_add_event_to_queue(event, queue);
_add_event_to_queue (event, &listener->queue);
node = avl_get_next(node);
}
......@@ -777,7 +752,7 @@ static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex)
node2 = avl_get_first(source->stats_tree);
while (node2) {
event = _make_event_from_node((stats_node_t *)node2->key, source->source);
_add_event_to_queue(event, queue);
_add_event_to_queue (event, &listener->queue);
node2 = avl_get_next(node2);
}
......@@ -786,7 +761,8 @@ static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex)
}
/* now we register to receive future event notices */
_register_listener(queue, mutex);
listener->next = (event_listener_t *)_event_listeners;
_event_listeners = listener;
thread_mutex_unlock(&_stats_mutex);
}
......@@ -794,48 +770,44 @@ static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex)
void *stats_connection(void *arg)
{
client_t *client = (client_t *)arg;
stats_event_t *local_event_queue = NULL;
mutex_t local_event_mutex;
stats_event_t *event;
event_listener_t listener;
INFO0 ("stats client starting");
event_queue_init (&listener.queue);
/* increment the thread count */
thread_mutex_lock(&_stats_mutex);
_stats_threads++;
stats_event_args (NULL, "stats", "%d", _stats_threads);
thread_mutex_unlock(&_stats_mutex);
thread_mutex_create(&local_event_mutex);
thread_mutex_create (&(listener.mutex));
_atomic_get_and_register(&local_event_queue, &local_event_mutex);
_register_listener (&listener);
while (_stats_running) {
thread_mutex_lock(&local_event_mutex);
event = _get_event_from_queue(&local_event_queue);
thread_mutex_lock (&listener.mutex);
event = _get_event_from_queue (&listener.queue);
thread_mutex_unlock (&listener.mutex);
if (event != NULL) {
if (!_send_event_to_client(event, client)) {
if (_send_event_to_client(event, client) < 0) {
_free_event(event);
thread_mutex_unlock(&local_event_mutex);
break;
}
_free_event(event);
} else {
thread_mutex_unlock(&local_event_mutex);
thread_sleep (500000);
continue;
}
thread_mutex_unlock(&local_event_mutex);
thread_sleep (500000);
}
thread_mutex_lock(&_stats_mutex);
_unregister_listener (&local_event_queue);
_unregister_listener (&listener);
_stats_threads--;
stats_event_args (NULL, "stats", "%d", _stats_threads);
thread_mutex_unlock(&_stats_mutex);
thread_mutex_destroy(&local_event_mutex);
thread_mutex_destroy (&listener.mutex);
client_destroy (client);
INFO0 ("stats client finished");
......@@ -916,19 +888,18 @@ void stats_transform_xslt(client_t *client, const char *uri)
void stats_get_xml(xmlDocPtr *doc, int show_hidden)
{
stats_event_t *event;
stats_event_t *queue;
event_queue_t queue;
xmlNodePtr node, srcnode;
source_xml_t *src_nodes = NULL;
source_xml_t *next;
queue = NULL;
_dump_stats_to_queue(&queue);
event_queue_init (&queue);
_dump_stats_to_queue (&queue);
*doc = xmlNewDoc("1.0");
node = xmlNewDocNode(*doc, NULL, "icestats", NULL);
xmlDocSetRootElement(*doc, node);
event = _get_event_from_queue(&queue);
while (event)
{
......@@ -961,7 +932,7 @@ void stats_sendxml(client_t *client)
{
int bytes;
stats_event_t *event;
stats_event_t *queue;
event_queue_t queue;
xmlDocPtr doc;
xmlNodePtr node, srcnode;
int len;
......@@ -969,8 +940,8 @@ void stats_sendxml(client_t *client)
source_xml_t *snd;
source_xml_t *src_nodes = NULL;
queue = NULL;
_dump_stats_to_queue(&queue);
event_queue_init (&queue);
_dump_stats_to_queue (&queue);
doc = xmlNewDoc("1.0");
node = xmlNewDocNode(doc, NULL, "icestats", NULL);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment