Commit 17537d24 authored by Karl Heyes's avatar Karl Heyes

When starting relay threads, have the relay thread do the connection not the

slave thread.  Also improve cleanup handling and log messages as well

svn path=/icecast/trunk/icecast/; revision=8068
parent 02471ae6
......@@ -62,7 +62,8 @@
static void *_slave_thread(void *arg);
thread_type *_slave_thread_id;
static int slave_running = 0;
static int max_interval = 0;
static unsigned int max_interval = 0;
static int rescan_relays = 0;
relay_server *relay_free (relay_server *relay)
{
......@@ -73,7 +74,7 @@ relay_server *relay_free (relay_server *relay)
xmlFree (relay->server);
xmlFree (relay->mount);
xmlFree (relay->localmount);
xmlFree (relay);
free (relay);
return next;
}
......@@ -94,26 +95,20 @@ relay_server *relay_copy (relay_server *r)
}
static void *_relay_thread (void *arg)
/* force a recheck of the relays. This will recheck the master server if
* a this is a slave.
*/
void slave_recheck (void)
{
relay_server *relay = arg;
relay->running = 1;
stats_event_inc(NULL, "source_relay_connections");
source_main (relay->source);
relay->running = 0;
if (relay->cleanup)
relay_free (relay);
return NULL;
max_interval = 0;
}
void slave_recheck (void)
/* rescan the current relays to see if any need starting or if any
* relay threads have terminated
*/
void slave_rescan (void)
{
max_interval = 0;
rescan_relays = 1;
}
......@@ -129,36 +124,27 @@ void slave_initialize(void)
void slave_shutdown(void)
{
relay_server *relay;
if (!slave_running)
return;
slave_running = 0;
DEBUG0 ("waiting for slave thread");
thread_join (_slave_thread_id);
relay = global.relays;
while (relay)
relay = relay_free (relay);
global.relays = NULL;
relay = global.master_relays;
while (relay)
relay = relay_free (relay);
global.master_relays = NULL;
}
/* This does the actual connection for a relay. A thread is
* started off if a connection can be acquired
*/
static void start_relay_stream (relay_server *relay)
static void *start_relay_stream (void *arg)
{
relay_server *relay = arg;
sock_t streamsock = SOCK_ERROR;
source_t *src = relay->source;
http_parser_t *parser = NULL;
connection_t *con=NULL;
char header[4096];
relay->running = 1;
INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
do
{
......@@ -206,9 +192,15 @@ static void start_relay_stream (relay_server *relay)
DEBUG0("Failed to complete source initialisation");
break;
}
thread_create ("Relay Thread", _relay_thread, relay, THREAD_DETACHED);
stats_event_inc(NULL, "source_relay_connections");
return;
source_main (relay->source);
/* initiate an immediate relay cleanup run */
relay->cleanup = 1;
slave_rescan();
return NULL;
} while (0);
if (con == NULL && streamsock != SOCK_ERROR)
......@@ -220,6 +212,12 @@ static void start_relay_stream (relay_server *relay)
httpp_destroy (parser);
src->parser = NULL;
source_clear_source (relay->source);
/* initiate an immediate relay cleanup run */
relay->cleanup = 1;
slave_rescan();
return NULL;
}
......@@ -228,13 +226,33 @@ static void check_relay_stream (relay_server *relay)
{
if (relay->source == NULL)
{
if (relay->localmount[0] != '/')
{
WARN1 ("relay mountpoint \"%s\" does not start with /, skipping",
relay->localmount);
return;
}
/* new relay, reserve the name */
DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
relay->source = source_reserve (relay->localmount);
if (relay->source)
DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
else
WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
}
if (relay->source && !relay->running)
{
start_relay_stream (relay);
relay->thread = thread_create ("Relay Thread", start_relay_stream,
relay, THREAD_ATTACHED);
return;
}
/* the relay thread may of close down */
if (relay->cleanup && relay->thread)
{
DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
thread_join (relay->thread);
relay->thread = NULL;
relay->cleanup = 0;
relay->running = 0;
}
}
......@@ -257,6 +275,7 @@ update_relay_set (relay_server **current, relay_server *updated)
while (existing_relay)
{
/* break out if keeping relay */
if (strcmp (relay->localmount, existing_relay->localmount) == 0)
break;
existing_p = &existing_relay->next;
......@@ -281,33 +300,44 @@ update_relay_set (relay_server **current, relay_server *updated)
/* update the relay_list with entries from new_relay_list. Any new relays
* are added to the list, and any not listed in the provided new_relay_list
* get marked for shutting down, just in case they are not shutting down by
* themselves
* are separated and returned in a separate list
*/
static void
static relay_server *
update_relays (relay_server **relay_list, relay_server *new_relay_list)
{
relay_server *relay, *current;
relay_server *active_relays, *cleanup_relays;
current = update_relay_set (relay_list, new_relay_list);
active_relays = update_relay_set (relay_list, new_relay_list);
/* ok whats left, lets make sure they shut down */
relay = *relay_list;
while (relay)
cleanup_relays = *relay_list;
/* re-assign new set */
*relay_list = active_relays;
return cleanup_relays;
}
static void relay_check_streams (relay_server *to_start, relay_server *to_free)
{
relay_server *relay;
while (to_free)
{
relay->cleanup = 1;
if (relay->source)
if (to_free->running && to_free->source)
{
if (relay->source->running)
DEBUG1 ("requested %s to shut down", relay->source->mount);
relay->source->running = 0;
relay = relay->next;
DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
to_free->source->running = 0;
thread_join (to_free->thread);
}
else
relay = relay_free (relay);
to_free = relay_free (to_free);
}
relay = to_start;
while (relay)
{
check_relay_stream (relay);
relay = relay->next;
}
/* re-assign new set */
*relay_list = current;
}
......@@ -321,7 +351,7 @@ static int update_from_master(ice_config_t *config)
do
{
char *authheader, *data;
relay_server *relays = NULL, *relay;
relay_server *new_relays = NULL, *cleanup_relays;
int len, count = 1;
username = strdup ("relay");
......@@ -345,11 +375,9 @@ static int update_from_master(ice_config_t *config)
break;
}
len = strlen(username) + strlen(password) + 1;
authheader = malloc(len+1);
strcpy(authheader, username);
strcat(authheader, ":");
strcat(authheader, password);
len = strlen(username) + strlen(password) + 2;
authheader = malloc(len);
snprintf (authheader, len, "%s:%s", username, password);
data = util_base64_encode(authheader);
sock_write (mastersock,
"GET /admin/streamlist.txt HTTP/1.0\r\n"
......@@ -385,23 +413,20 @@ static int update_from_master(ice_config_t *config)
r->mount = xmlStrdup (buf);
r->localmount = xmlStrdup (buf);
r->mp3metadata = 1;
r->next = relays;
relays = r;
r->next = new_relays;
new_relays = r;
}
}
sock_close (mastersock);
update_relays (&global.master_relays, relays);
/* start any inactive relays */
relay = global.master_relays;
while (relay)
{
check_relay_stream (relay);
relay = relay->next;
}
relay = relays;
while (relay)
relay = relay_free (relay);
thread_mutex_lock (&(config_locks()->relay_lock));
cleanup_relays = update_relays (&global.master_relays, new_relays);
relay_check_streams (global.master_relays, cleanup_relays);
relay_check_streams (NULL, new_relays);
thread_mutex_unlock (&(config_locks()->relay_lock));
} while(0);
if (master)
......@@ -418,39 +443,52 @@ static int update_from_master(ice_config_t *config)
static void *_slave_thread(void *arg)
{
ice_config_t *config;
relay_server *relay;
unsigned interval = 0;
unsigned int interval = 0;
while (slave_running)
{
relay_server *cleanup_relays;
thread_sleep (1000000);
if (max_interval > ++interval)
if (rescan_relays == 0 && max_interval > ++interval)
continue;
interval = 0;
config = config_get_config();
/* only update relays lists when required */
if (max_interval <= interval)
{
DEBUG0 ("checking master stream list");
config = config_get_config();
max_interval = config->master_update_interval;
interval = 0;
max_interval = config->master_update_interval;
/* the connection could time some time, so the lock can drop */
if (update_from_master (config))
config = config_get_config();
/* the connection could take some time, so the lock can drop */
if (update_from_master (config))
config = config_get_config();
thread_mutex_lock (&(config_locks()->relay_lock));
thread_mutex_lock (&(config_locks()->relay_lock));
update_relays (&global.relays, config->relay);
cleanup_relays = update_relays (&global.relays, config->relay);
config_release_config();
config_release_config();
/* start any inactive relays */
relay = global.relays;
while (relay)
relay_check_streams (global.relays, cleanup_relays);
thread_mutex_unlock (&(config_locks()->relay_lock));
}
else
{
check_relay_stream (relay);
relay = relay->next;
DEBUG0 ("rescanning relay lists");
thread_mutex_lock (&(config_locks()->relay_lock));
relay_check_streams (global.master_relays, NULL);
relay_check_streams (global.relays, NULL);
thread_mutex_unlock (&(config_locks()->relay_lock));
}
thread_mutex_unlock (&(config_locks()->relay_lock));
rescan_relays = 0;
}
DEBUG0 ("shutting down current relays");
relay_check_streams (NULL, global.relays);
relay_check_streams (NULL, global.master_relays);
INFO0 ("Slave thread shutdown complete");
return NULL;
......
......@@ -13,6 +13,8 @@
#ifndef __SLAVE_H__
#define __SLAVE_H__
#include <thread/thread.h>
typedef struct _relay_server {
char *server;
int port;
......@@ -22,6 +24,7 @@ typedef struct _relay_server {
int mp3metadata;
int running;
int cleanup;
thread_type *thread;
struct _relay_server *next;
} relay_server;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment