Commit 5b9e7f16 authored by Karl Heyes's avatar Karl Heyes

use minimal stats for inactive mountpoints that have an active fallback.

svn path=/icecast/trunk/icecast/; revision=9286
parent 018fff11
...@@ -449,7 +449,6 @@ int connection_complete_source (source_t *source) ...@@ -449,7 +449,6 @@ int connection_complete_source (source_t *source)
if (global.sources < config->source_limit) if (global.sources < config->source_limit)
{ {
char *contenttype; char *contenttype;
mount_proxy *mountproxy;
format_type_t format_type; format_type_t format_type;
/* setup format handler */ /* setup format handler */
...@@ -516,10 +515,7 @@ int connection_complete_source (source_t *source) ...@@ -516,10 +515,7 @@ int connection_complete_source (source_t *source)
} }
} }
mountproxy = config_find_mount (config, source->mount); source_update_settings (config, source);
if (mountproxy)
source_apply_mount (source, mountproxy);
config_release_config(); config_release_config();
source->shutdown_rwlock = &_source_shutdown_rwlock; source->shutdown_rwlock = &_source_shutdown_rwlock;
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "event.h" #include "event.h"
#include "cfgfile.h" #include "cfgfile.h"
#include "yp.h" #include "yp.h"
#include "source.h"
#include "refbuf.h" #include "refbuf.h"
#include "client.h" #include "client.h"
...@@ -58,10 +59,10 @@ void event_config_read(void *arg) ...@@ -58,10 +59,10 @@ void event_config_read(void *arg)
config_clear(config); config_clear(config);
config_set_config(&new_config); config_set_config(&new_config);
restart_logging (config_get_config_unlocked()); restart_logging (config_get_config_unlocked());
slave_recheck();
yp_recheck_config (config_get_config_unlocked()); yp_recheck_config (config_get_config_unlocked());
config_release_config(); config_release_config();
slave_recheck_mounts();
} }
} }
...@@ -62,6 +62,7 @@ ...@@ -62,6 +62,7 @@
static void *_slave_thread(void *arg); static void *_slave_thread(void *arg);
static thread_type *_slave_thread_id; static thread_type *_slave_thread_id;
static int slave_running = 0; static int slave_running = 0;
static int update_settings = 0;
volatile static unsigned int max_interval = 0; volatile static unsigned int max_interval = 0;
volatile static int rescan_relays = 0; volatile static int rescan_relays = 0;
...@@ -106,16 +107,19 @@ relay_server *relay_copy (relay_server *r) ...@@ -106,16 +107,19 @@ relay_server *relay_copy (relay_server *r)
/* force a recheck of the relays. This will recheck the master server if /* force a recheck of the relays. This will recheck the master server if
* a this is a slave. * a this is a slave.
*/ */
void slave_recheck (void) void slave_recheck_mounts (void)
{ {
max_interval = 0; max_interval = 0;
update_settings = 1;
} }
/* rescan the current relays to see if any need starting or if any
* relay threads have terminated /* Request slave thread to check the relay list for changes and to
* update the stats for the current streams.
*/ */
void slave_rescan (void) void slave_rebuild_mounts (void)
{ {
update_settings = 1;
rescan_relays = 1; rescan_relays = 1;
} }
...@@ -233,7 +237,7 @@ static void *start_relay_stream (void *arg) ...@@ -233,7 +237,7 @@ static void *start_relay_stream (void *arg)
/* initiate an immediate relay cleanup run */ /* initiate an immediate relay cleanup run */
relay->cleanup = 1; relay->cleanup = 1;
slave_rescan(); rescan_relays = 1;
return NULL; return NULL;
} while (0); } while (0);
...@@ -250,7 +254,7 @@ static void *start_relay_stream (void *arg) ...@@ -250,7 +254,7 @@ static void *start_relay_stream (void *arg)
/* initiate an immediate relay cleanup run */ /* initiate an immediate relay cleanup run */
relay->cleanup = 1; relay->cleanup = 1;
slave_rescan(); rescan_relays = 1;
return NULL; return NULL;
} }
...@@ -288,6 +292,7 @@ static void check_relay_stream (relay_server *relay) ...@@ -288,6 +292,7 @@ static void check_relay_stream (relay_server *relay)
relay->thread = NULL; relay->thread = NULL;
relay->cleanup = 0; relay->cleanup = 0;
relay->running = 0; relay->running = 0;
update_settings = 1;
} }
} }
...@@ -305,6 +310,8 @@ static int relay_has_changed (relay_server *new, relay_server *old) ...@@ -305,6 +310,8 @@ static int relay_has_changed (relay_server *new, relay_server *old)
break; break;
if (new->port != old->port) if (new->port != old->port)
break; break;
if (new->mp3metadata != old->mp3metadata)
break;
return 0; return 0;
} while (0); } while (0);
return 1; return 1;
...@@ -383,6 +390,7 @@ static void relay_check_streams (relay_server *to_start, relay_server *to_free) ...@@ -383,6 +390,7 @@ static void relay_check_streams (relay_server *to_start, relay_server *to_free)
DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount); DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
to_free->source->running = 0; to_free->source->running = 0;
thread_join (to_free->thread); thread_join (to_free->thread);
update_settings = 1;
} }
to_free = relay_free (to_free); to_free = relay_free (to_free);
} }
...@@ -541,6 +549,11 @@ static void *_slave_thread(void *arg) ...@@ -541,6 +549,11 @@ static void *_slave_thread(void *arg)
thread_mutex_unlock (&(config_locks()->relay_lock)); thread_mutex_unlock (&(config_locks()->relay_lock));
} }
rescan_relays = 0; rescan_relays = 0;
if (update_settings)
{
update_settings = 0;
source_recheck_mounts();
}
} }
DEBUG0 ("shutting down current relays"); DEBUG0 ("shutting down current relays");
relay_check_streams (NULL, global.relays); relay_check_streams (NULL, global.relays);
......
...@@ -33,7 +33,8 @@ typedef struct _relay_server { ...@@ -33,7 +33,8 @@ typedef struct _relay_server {
void slave_initialize(void); void slave_initialize(void);
void slave_shutdown(void); void slave_shutdown(void);
void slave_recheck (void); void slave_recheck_mounts (void);
void slave_rebuild_mounts (void);
relay_server *relay_free (relay_server *relay); relay_server *relay_free (relay_server *relay);
#endif /* __SLAVE_H__ */ #endif /* __SLAVE_H__ */
...@@ -25,11 +25,11 @@ ...@@ -25,11 +25,11 @@
#ifndef _WIN32 #ifndef _WIN32
#include <unistd.h> #include <unistd.h>
#include <sys/time.h> #include <sys/time.h>
#include <sys/socket.h> #include <sys/socket.h>
#else #else
#include <winsock2.h> #include <winsock2.h>
#include <windows.h> #include <windows.h>
#define snprintf _snprintf #define snprintf _snprintf
#endif #endif
#include "thread/thread.h" #include "thread/thread.h"
...@@ -293,6 +293,7 @@ client_t *source_find_client(source_t *source, int id) ...@@ -293,6 +293,7 @@ client_t *source_find_client(source_t *source, int id)
return NULL; return NULL;
} }
/* Move clients from source to dest provided dest is running /* Move clients from source to dest provided dest is running
* and that the stream format is the same. * and that the stream format is the same.
* The only lock that should be held when this is called is the * The only lock that should be held when this is called is the
...@@ -376,8 +377,8 @@ void source_move_clients (source_t *source, source_t *dest) ...@@ -376,8 +377,8 @@ void source_move_clients (source_t *source, source_t *dest)
/* clients need to be start from somewhere in the queue /* clients need to be start from somewhere in the queue
* * so we will look for a refbuf which has been previous * so we will look for a refbuf which has been previous
* * marked as a sync point */ * marked as a sync point */
static void find_client_start (source_t *source, client_t *client) static void find_client_start (source_t *source, client_t *client)
{ {
refbuf_t *refbuf = source->burst_point; refbuf_t *refbuf = source->burst_point;
...@@ -600,6 +601,7 @@ static void source_init (source_t *source) ...@@ -600,6 +601,7 @@ static void source_init (source_t *source)
avl_tree_unlock(global.source_tree); avl_tree_unlock(global.source_tree);
} }
slave_rebuild_mounts ();
if (source->yp_public) { if (source->yp_public) {
yp_add (source); yp_add (source);
} }
...@@ -811,6 +813,7 @@ static void source_shutdown (source_t *source) ...@@ -811,6 +813,7 @@ static void source_shutdown (source_t *source)
thread_rwlock_unlock(source->shutdown_rwlock); thread_rwlock_unlock(source->shutdown_rwlock);
} }
static int _compare_clients(void *compare_arg, void *a, void *b) static int _compare_clients(void *compare_arg, void *a, void *b)
{ {
client_t *clienta = (client_t *)a; client_t *clienta = (client_t *)a;
...@@ -872,7 +875,7 @@ static void _parse_audio_info (source_t *source, const char *s) ...@@ -872,7 +875,7 @@ static void _parse_audio_info (source_t *source, const char *s)
} }
void source_apply_mount (source_t *source, mount_proxy *mountinfo) static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
{ {
DEBUG1("Applying mount information for \"%s\"", source->mount); DEBUG1("Applying mount information for \"%s\"", source->mount);
source->max_listeners = mountinfo->max_listeners; source->max_listeners = mountinfo->max_listeners;
...@@ -882,10 +885,8 @@ void source_apply_mount (source_t *source, mount_proxy *mountinfo) ...@@ -882,10 +885,8 @@ void source_apply_mount (source_t *source, mount_proxy *mountinfo)
stats_event_hidden (source->mount, NULL, source->hidden); stats_event_hidden (source->mount, NULL, source->hidden);
if (mountinfo->fallback_mount) if (mountinfo->fallback_mount)
{
source->fallback_mount = strdup (mountinfo->fallback_mount); source->fallback_mount = strdup (mountinfo->fallback_mount);
DEBUG1 ("fallback %s", mountinfo->fallback_mount);
}
if (mountinfo->auth_type != NULL) if (mountinfo->auth_type != NULL)
{ {
source->authenticator = auth_get_authenticator( source->authenticator = auth_get_authenticator(
...@@ -894,33 +895,69 @@ void source_apply_mount (source_t *source, mount_proxy *mountinfo) ...@@ -894,33 +895,69 @@ void source_apply_mount (source_t *source, mount_proxy *mountinfo)
} }
if (mountinfo->dumpfile) if (mountinfo->dumpfile)
{ {
DEBUG1("Dumping stream to %s", mountinfo->dumpfile); free (source->dumpfilename);
source->dumpfilename = strdup (mountinfo->dumpfile); source->dumpfilename = strdup (mountinfo->dumpfile);
} }
if (mountinfo->queue_size_limit) if (mountinfo->queue_size_limit)
{
source->queue_size_limit = mountinfo->queue_size_limit; source->queue_size_limit = mountinfo->queue_size_limit;
DEBUG1 ("queue size to %u", source->queue_size_limit);
}
if (mountinfo->source_timeout) if (mountinfo->source_timeout)
{
source->timeout = mountinfo->source_timeout; source->timeout = mountinfo->source_timeout;
DEBUG1 ("source timeout to %u", source->timeout);
} if (mountinfo->burst_size >= 0)
source->burst_size = (unsigned int)mountinfo->burst_size;
if (mountinfo->no_yp) if (mountinfo->no_yp)
{
source->yp_prevent = 1; source->yp_prevent = 1;
DEBUG0 ("preventing YP listings");
}
if (mountinfo->burst_size > -1)
source->burst_size = mountinfo->burst_size;
DEBUG1 ("amount to burst on client connect set to %u", source->burst_size);
if (source->format && source->format->apply_settings) if (source->format && source->format->apply_settings)
source->format->apply_settings (source->client, source->format, mountinfo); source->format->apply_settings (source->client, source->format, mountinfo);
} }
void source_update_settings (ice_config_t *config, source_t *source)
{
mount_proxy *mountinfo = config_find_mount (config, source->mount);
/* set global settings first */
source->queue_size_limit = config->queue_size_limit;
source->timeout = config->source_timeout;
source->burst_size = config->burst_size;
source->dumpfilename = NULL;
if (mountinfo)
source_apply_mount (source, mountinfo);
if (source->fallback_mount)
DEBUG1 ("fallback %s", source->fallback_mount);
if (source->dumpfilename)
DEBUG1 ("Dumping stream to %s", source->dumpfilename);
if (source->yp_prevent)
DEBUG0 ("preventing YP listings");
if (source->hidden)
{
stats_event_hidden (source->mount, NULL, 1);
DEBUG0 ("hidden from xsl");
}
else
stats_event_hidden (source->mount, NULL, 0);
if (source->max_listeners == -1)
stats_event (source->mount, "max_listeners", "unlimited");
else
{
char buf [10];
snprintf (buf, sizeof (buf), "%lu", source->max_listeners);
stats_event (source->mount, "max_listeners", buf);
}
DEBUG1 ("max listeners to %d", source->max_listeners);
DEBUG1 ("queue size to %u", source->queue_size_limit);
DEBUG1 ("burst size to %u", source->burst_size);
DEBUG1 ("source timeout to %u", source->timeout);
}
void *source_client_thread (void *arg) void *source_client_thread (void *arg)
{ {
source_t *source = arg; source_t *source = arg;
...@@ -944,6 +981,62 @@ void *source_client_thread (void *arg) ...@@ -944,6 +981,62 @@ void *source_client_thread (void *arg)
source_main (source); source_main (source);
} }
source_free_source (source); source_free_source (source);
slave_rebuild_mounts ();
return NULL; return NULL;
} }
/* rescan the mount list, so that xsl files are updated to show
* unconnected but active fallback mountpoints
*/
void source_recheck_mounts (void)
{
ice_config_t *config = config_get_config();
mount_proxy *mount = config->mounts;
avl_tree_rlock (global.source_tree);
while (mount)
{
int update_stats = 0;
int hidden;
source_t *source = source_find_mount (mount->mountname);
hidden = mount->hidden;
if (source)
{
/* something is active, maybe a fallback */
if (strcmp (source->mount, mount->mountname) == 0)
{
/* this is for inactive relays */
if (source->running == 0)
update_stats = 1;
}
else
update_stats = 1;
}
else
stats_event (mount->mountname, NULL, NULL);
if (update_stats)
{
source = source_find_mount_raw (mount->mountname);
if (source)
source_update_settings (config, source);
else
{
stats_event_hidden (mount->mountname, NULL, hidden);
stats_event (mount->mountname, "listeners", "0");
if (mount->max_listeners < 0)
stats_event (mount->mountname, "max_listeners", "unlimited");
else
stats_event_args (mount->mountname, "max_listeners", "%d", mount->max_listeners);
}
}
mount = mount->next;
}
avl_tree_unlock (global.source_tree);
config_release_config();
}
...@@ -78,7 +78,7 @@ typedef struct source_tag ...@@ -78,7 +78,7 @@ typedef struct source_tag
source_t *source_reserve (const char *mount); source_t *source_reserve (const char *mount);
void *source_client_thread (void *arg); void *source_client_thread (void *arg);
void source_apply_mount (source_t *source, mount_proxy *mountinfo); void source_update_settings (ice_config_t *config, source_t *source);
void source_clear_source (source_t *source); void source_clear_source (source_t *source);
source_t *source_find_mount(const char *mount); source_t *source_find_mount(const char *mount);
source_t *source_find_mount_raw(const char *mount); source_t *source_find_mount_raw(const char *mount);
...@@ -88,6 +88,7 @@ void source_free_source(source_t *source); ...@@ -88,6 +88,7 @@ void source_free_source(source_t *source);
void source_move_clients (source_t *source, source_t *dest); void source_move_clients (source_t *source, source_t *dest);
int source_remove_client(void *key); int source_remove_client(void *key);
void source_main(source_t *source); void source_main(source_t *source);
void source_recheck_mounts (void);
extern mutex_t move_clients_mutex; extern mutex_t move_clients_mutex;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment