Commit 0dc26558 authored by Karl Heyes's avatar Karl Heyes

slave handler update. add timestamps to relays, allows slave thread to

process them better. This simplifies various checks and sits better with
relay startup and relay cleanup in certain error cases.

svn path=/icecast/trunk/icecast/; revision=11008
parent 9e961203
...@@ -318,7 +318,6 @@ static int add_client_to_source (source_t *source, client_t *client) ...@@ -318,7 +318,6 @@ static int add_client_to_source (source_t *source, client_t *client)
/* enable on-demand relay to start, wake up the slave thread */ /* enable on-demand relay to start, wake up the slave thread */
DEBUG0("kicking off on-demand relay"); DEBUG0("kicking off on-demand relay");
source->on_demand_req = 1; source->on_demand_req = 1;
slave_rescan ();
} }
DEBUG1 ("Added client to %s", source->mount); DEBUG1 ("Added client to %s", source->mount);
return 0; return 0;
......
...@@ -35,7 +35,7 @@ typedef enum ...@@ -35,7 +35,7 @@ typedef enum
AUTH_FORBIDDEN, AUTH_FORBIDDEN,
AUTH_USERADDED, AUTH_USERADDED,
AUTH_USEREXISTS, AUTH_USEREXISTS,
AUTH_USERDELETED, AUTH_USERDELETED
} auth_result; } auth_result;
typedef struct auth_client_tag typedef struct auth_client_tag
......
...@@ -65,7 +65,6 @@ static thread_type *_slave_thread_id; ...@@ -65,7 +65,6 @@ static thread_type *_slave_thread_id;
static int slave_running = 0; static int slave_running = 0;
static int update_settings = 0; static int update_settings = 0;
static volatile unsigned int max_interval = 0; static volatile unsigned int max_interval = 0;
static volatile int rescan_relays = 0;
relay_server *relay_free (relay_server *relay) relay_server *relay_free (relay_server *relay)
{ {
...@@ -116,22 +115,12 @@ void slave_recheck_mounts (void) ...@@ -116,22 +115,12 @@ void slave_recheck_mounts (void)
} }
/* Request slave thread to rescan the existing relays to see if any need
* starting up, eg on-demand relays
*/
void slave_rescan (void)
{
rescan_relays = 1;
}
/* Request slave thread to check the relay list for changes and to /* Request slave thread to check the relay list for changes and to
* update the stats for the current streams. * update the stats for the current streams.
*/ */
void slave_rebuild_mounts (void) void slave_rebuild_mounts (void)
{ {
update_settings = 1; update_settings = 1;
rescan_relays = 1;
} }
...@@ -249,6 +238,7 @@ static void *start_relay_stream (void *arg) ...@@ -249,6 +238,7 @@ static void *start_relay_stream (void *arg)
break; break;
} }
global_unlock (); global_unlock ();
sock_set_blocking (streamsock, SOCK_NONBLOCK);
con = NULL; con = NULL;
parser = NULL; parser = NULL;
client_set_queue (src->client, NULL); client_set_queue (src->client, NULL);
...@@ -268,11 +258,11 @@ static void *start_relay_stream (void *arg) ...@@ -268,11 +258,11 @@ static void *start_relay_stream (void *arg)
/* only keep refreshing YP entries for inactive on-demand relays */ /* only keep refreshing YP entries for inactive on-demand relays */
yp_remove (relay->localmount); yp_remove (relay->localmount);
relay->source->yp_public = -1; relay->source->yp_public = -1;
relay->start = time(NULL) + 10; /* prevent busy looping if failing */
} }
/* initiate an immediate relay cleanup run */ /* we've finished, now get cleaned up */
relay->cleanup = 1; relay->cleanup = 1;
rescan_relays = 1;
return NULL; return NULL;
} while (0); } while (0);
...@@ -299,9 +289,9 @@ static void *start_relay_stream (void *arg) ...@@ -299,9 +289,9 @@ static void *start_relay_stream (void *arg)
src->parser = NULL; src->parser = NULL;
source_clear_source (relay->source); source_clear_source (relay->source);
/* initiate an immediate relay cleanup run */ /* cleanup relay, but prevent this relay from starting up again too soon */
relay->start = time(NULL) + max_interval;
relay->cleanup = 1; relay->cleanup = 1;
rescan_relays = 1;
return NULL; return NULL;
} }
...@@ -321,25 +311,21 @@ static void check_relay_stream (relay_server *relay) ...@@ -321,25 +311,21 @@ static void check_relay_stream (relay_server *relay)
/* new relay, reserve the name */ /* new relay, reserve the name */
relay->source = source_reserve (relay->localmount); relay->source = source_reserve (relay->localmount);
if (relay->source) if (relay->source)
{
DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount); DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
slave_rebuild_mounts();
}
else else
WARN1 ("new relay but source \"%s\" already exists", relay->localmount); WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
} }
do do
{ {
source_t *source = relay->source; source_t *source = relay->source;
if (relay->source == NULL || relay->running) /* skip relay if active, not configured or just not time yet */
if (relay->source == NULL || relay->running || relay->start > time(NULL))
break; break;
if (relay->on_demand) if (relay->on_demand && source->on_demand_req == 0)
{ {
ice_config_t *config = config_get_config ();
mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
if (mountinfo == NULL)
source_update_settings (config, relay->source, mountinfo);
config_release_config ();
slave_rebuild_mounts();
stats_event (relay->localmount, "listeners", "0");
relay->source->on_demand = relay->on_demand; relay->source->on_demand = relay->on_demand;
if (source->fallback_mount && source->fallback_override) if (source->fallback_mount && source->fallback_override)
...@@ -359,17 +345,21 @@ static void check_relay_stream (relay_server *relay) ...@@ -359,17 +345,21 @@ static void check_relay_stream (relay_server *relay)
break; break;
} }
relay->start = time(NULL) + 5;
relay->thread = thread_create ("Relay Thread", start_relay_stream, relay->thread = thread_create ("Relay Thread", start_relay_stream,
relay, THREAD_ATTACHED); relay, THREAD_ATTACHED);
return; return;
} while(0); } while (0);
/* the relay thread may of shut down itself */ /* the relay thread may of shut down itself */
if (relay->cleanup && relay->thread) if (relay->cleanup)
{ {
DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount); if (relay->thread)
thread_join (relay->thread); {
relay->thread = NULL; DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
thread_join (relay->thread);
relay->thread = NULL;
}
relay->cleanup = 0; relay->cleanup = 0;
relay->running = 0; relay->running = 0;
...@@ -469,7 +459,8 @@ update_relays (relay_server **relay_list, relay_server *new_relay_list) ...@@ -469,7 +459,8 @@ update_relays (relay_server **relay_list, relay_server *new_relay_list)
} }
static void relay_check_streams (relay_server *to_start, relay_server *to_free) static void relay_check_streams (relay_server *to_start,
relay_server *to_free, int skip_timer)
{ {
relay_server *relay; relay_server *relay;
...@@ -494,6 +485,8 @@ static void relay_check_streams (relay_server *to_start, relay_server *to_free) ...@@ -494,6 +485,8 @@ static void relay_check_streams (relay_server *to_start, relay_server *to_free)
relay = to_start; relay = to_start;
while (relay) while (relay)
{ {
if (skip_timer)
relay->start = 0;
check_relay_stream (relay); check_relay_stream (relay);
relay = relay->next; relay = relay->next;
} }
...@@ -584,8 +577,8 @@ static int update_from_master(ice_config_t *config) ...@@ -584,8 +577,8 @@ static int update_from_master(ice_config_t *config)
thread_mutex_lock (&(config_locks()->relay_lock)); thread_mutex_lock (&(config_locks()->relay_lock));
cleanup_relays = update_relays (&global.master_relays, new_relays); cleanup_relays = update_relays (&global.master_relays, new_relays);
relay_check_streams (global.master_relays, cleanup_relays); relay_check_streams (global.master_relays, cleanup_relays, 0);
relay_check_streams (NULL, new_relays); relay_check_streams (NULL, new_relays, 0);
thread_mutex_unlock (&(config_locks()->relay_lock)); thread_mutex_unlock (&(config_locks()->relay_lock));
...@@ -611,7 +604,8 @@ static void *_slave_thread(void *arg) ...@@ -611,7 +604,8 @@ static void *_slave_thread(void *arg)
while (1) while (1)
{ {
relay_server *cleanup_relays; relay_server *cleanup_relays = NULL;
int skip_timer = 0;
/* re-read xml file if requested */ /* re-read xml file if requested */
if (global . schedule_config_reread) if (global . schedule_config_reread)
...@@ -623,8 +617,8 @@ static void *_slave_thread(void *arg) ...@@ -623,8 +617,8 @@ static void *_slave_thread(void *arg)
thread_sleep (1000000); thread_sleep (1000000);
if (slave_running == 0) if (slave_running == 0)
break; break;
if (rescan_relays == 0 && max_interval > ++interval)
continue; ++interval;
/* only update relays lists when required */ /* only update relays lists when required */
if (max_interval <= interval) if (max_interval <= interval)
...@@ -632,6 +626,8 @@ static void *_slave_thread(void *arg) ...@@ -632,6 +626,8 @@ static void *_slave_thread(void *arg)
DEBUG0 ("checking master stream list"); DEBUG0 ("checking master stream list");
config = config_get_config(); config = config_get_config();
if (max_interval == 0)
skip_timer = 1;
interval = 0; interval = 0;
max_interval = config->master_update_interval; max_interval = config->master_update_interval;
...@@ -644,19 +640,14 @@ static void *_slave_thread(void *arg) ...@@ -644,19 +640,14 @@ static void *_slave_thread(void *arg)
cleanup_relays = update_relays (&global.relays, config->relay); cleanup_relays = update_relays (&global.relays, config->relay);
config_release_config(); config_release_config();
relay_check_streams (global.relays, cleanup_relays);
thread_mutex_unlock (&(config_locks()->relay_lock));
} }
else else
{
DEBUG0 ("rescanning relay lists");
thread_mutex_lock (&(config_locks()->relay_lock)); thread_mutex_lock (&(config_locks()->relay_lock));
relay_check_streams (global.master_relays, NULL);
relay_check_streams (global.relays, NULL); relay_check_streams (global.relays, cleanup_relays, skip_timer);
thread_mutex_unlock (&(config_locks()->relay_lock)); relay_check_streams (global.master_relays, NULL, skip_timer);
} thread_mutex_unlock (&(config_locks()->relay_lock));
rescan_relays = 0;
if (update_settings) if (update_settings)
{ {
update_settings = 0; update_settings = 0;
...@@ -664,8 +655,8 @@ static void *_slave_thread(void *arg) ...@@ -664,8 +655,8 @@ static void *_slave_thread(void *arg)
} }
} }
DEBUG0 ("shutting down current relays"); DEBUG0 ("shutting down current relays");
relay_check_streams (NULL, global.relays); relay_check_streams (NULL, global.relays, 0);
relay_check_streams (NULL, global.master_relays); relay_check_streams (NULL, global.master_relays, 0);
INFO0 ("Slave thread shutdown complete"); INFO0 ("Slave thread shutdown complete");
......
...@@ -27,6 +27,7 @@ typedef struct _relay_server { ...@@ -27,6 +27,7 @@ typedef struct _relay_server {
int on_demand; int on_demand;
int running; int running;
int cleanup; int cleanup;
time_t start;
thread_type *thread; thread_type *thread;
struct _relay_server *next; struct _relay_server *next;
} relay_server; } relay_server;
...@@ -36,7 +37,6 @@ void slave_initialize(void); ...@@ -36,7 +37,6 @@ void slave_initialize(void);
void slave_shutdown(void); void slave_shutdown(void);
void slave_recheck_mounts (void); void slave_recheck_mounts (void);
void slave_rebuild_mounts (void); void slave_rebuild_mounts (void);
void slave_rescan (void);
relay_server *relay_free (relay_server *relay); relay_server *relay_free (relay_server *relay);
#endif /* __SLAVE_H__ */ #endif /* __SLAVE_H__ */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment