Commit ae825afd authored by Karl Heyes's avatar Karl Heyes

Make the slave thread reserve relay mountpoints, and prevent reconnection

when a relay is currently active.

svn path=/trunk/icecast/; revision=5845
parent a83553ad
...@@ -32,15 +32,6 @@ typedef struct ice_config_dir_tag ...@@ -32,15 +32,6 @@ typedef struct ice_config_dir_tag
struct ice_config_dir_tag *next; struct ice_config_dir_tag *next;
} ice_config_dir_t; } ice_config_dir_t;
typedef struct _relay_server {
char *server;
int port;
char *mount;
char *localmount;
int mp3metadata;
struct _relay_server *next;
} relay_server;
typedef struct _config_options { typedef struct _config_options {
char *name; char *name;
char *value; char *value;
......
...@@ -479,6 +479,7 @@ int connection_complete_source (source_t *source) ...@@ -479,6 +479,7 @@ int connection_complete_source (source_t *source)
global.sources++; global.sources++;
global_unlock(); global_unlock();
stats_event_inc(NULL, "sources");
/* for relays, we don't yet have a client, however we do require one /* for relays, we don't yet have a client, however we do require one
* to retrieve the stream from. This is created here, quite late, * to retrieve the stream from. This is created here, quite late,
...@@ -578,10 +579,28 @@ int connection_create_source(client_t *client, connection_t *con, http_parser_t ...@@ -578,10 +579,28 @@ int connection_create_source(client_t *client, connection_t *con, http_parser_t
} }
config_release_config(); config_release_config();
/* we need to add this source into the tree but fail if this mountpoint
* already exists
*/
avl_tree_wlock(global.source_tree);
if (source_find_mount_raw (mount) != NULL)
{
avl_tree_unlock(global.source_tree);
global_lock();
global.sources--;
global_unlock();
stats_event_dec(NULL, "sources");
INFO1("source \"%s\" already in use", mount);
client_send_404 (client, "Mountpoint in use");
return 0;
}
avl_insert(global.source_tree, (void *)source);
avl_tree_unlock(global.source_tree);
source->send_return = 1; source->send_return = 1;
source->shutdown_rwlock = &_source_shutdown_rwlock; source->shutdown_rwlock = &_source_shutdown_rwlock;
sock_set_blocking(con->sock, SOCK_NONBLOCK); sock_set_blocking(con->sock, SOCK_NONBLOCK);
thread_create("Source Thread", source_main, (void *)source, THREAD_DETACHED); thread_create("Source Thread", source_client_thread, (void *)source, THREAD_DETACHED);
return 1; return 1;
fail: fail:
...@@ -775,9 +794,7 @@ static void _handle_source_request(connection_t *con, ...@@ -775,9 +794,7 @@ static void _handle_source_request(connection_t *con,
} }
avl_tree_unlock(global.source_tree); avl_tree_unlock(global.source_tree);
if (!connection_create_source(client, con, parser, uri)) { connection_create_source(client, con, parser, uri);
client_send_404(client, "Mountpoint in use");
}
} }
static void _handle_stats_request(connection_t *con, static void _handle_stats_request(connection_t *con,
......
...@@ -37,6 +37,8 @@ void global_initialize(void) ...@@ -37,6 +37,8 @@ void global_initialize(void)
{ {
memset(global.serversock, 0, sizeof(int)*MAX_LISTEN_SOCKETS); memset(global.serversock, 0, sizeof(int)*MAX_LISTEN_SOCKETS);
global.server_sockets = 0; global.server_sockets = 0;
global.relays = NULL;
global.master_relays = NULL;
global.running = 0; global.running = 0;
global.clients = 0; global.clients = 0;
global.sources = 0; global.sources = 0;
...@@ -47,7 +49,7 @@ void global_initialize(void) ...@@ -47,7 +49,7 @@ void global_initialize(void)
void global_shutdown(void) void global_shutdown(void)
{ {
thread_mutex_destroy(&_global_mutex); thread_mutex_destroy(&_global_mutex);
avl_tree_free(global.source_tree, source_free_source); avl_tree_free(global.source_tree, NULL);
} }
void global_lock(void) void global_lock(void)
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#define MAX_LISTEN_SOCKETS 10 #define MAX_LISTEN_SOCKETS 10
#include "thread/thread.h" #include "thread/thread.h"
#include "slave.h"
typedef struct ice_global_tag typedef struct ice_global_tag
{ {
...@@ -36,6 +37,10 @@ typedef struct ice_global_tag ...@@ -36,6 +37,10 @@ typedef struct ice_global_tag
int schedule_config_reread; int schedule_config_reread;
avl_tree *source_tree; avl_tree *source_tree;
/* for locally defined relays */
struct _relay_server *relays;
/* relays retrieved from master */
struct _relay_server *master_relays;
cond_t shutdown_cond; cond_t shutdown_cond;
} ice_global_t; } ice_global_t;
......
...@@ -61,199 +61,388 @@ ...@@ -61,199 +61,388 @@
static void *_slave_thread(void *arg); static void *_slave_thread(void *arg);
thread_type *_slave_thread_id; thread_type *_slave_thread_id;
static int _initialized = 0; static int slave_running = 0;
static unsigned max_interval = 0; static int max_interval = 0;
relay_server *relay_free (relay_server *relay)
void slave_recheck (void)
{ {
max_interval = 0; relay_server *next = relay->next;
DEBUG1("freeing relay %s", relay->localmount);
if (relay->source)
source_free_source (relay->source);
xmlFree (relay->server);
xmlFree (relay->mount);
xmlFree (relay->localmount);
xmlFree (relay);
return next;
} }
void slave_initialize(void) { relay_server *relay_copy (relay_server *r)
ice_config_t *config; {
if (_initialized) return; relay_server *copy = calloc (1, sizeof (relay_server));
config = config_get_config(); if (copy)
/* Don't create a slave thread if it isn't configured */
if (config->master_server == NULL &&
config->relay == NULL)
{ {
config_release_config(); copy->server = xmlStrdup (r->server);
return; copy->mount = xmlStrdup (r->mount);
copy->localmount = xmlStrdup (r->localmount);
copy->port = r->port;
copy->mp3metadata = r->mp3metadata;
} }
config_release_config(); return copy;
}
_initialized = 1;
_slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED); static void *_relay_thread (void *arg)
{
relay_server *relay = arg;
relay->running = 1;
source_main (relay->source);
relay->running = 0;
if (relay->cleanup)
relay_free (relay);
return NULL;
} }
void slave_shutdown(void) {
if (!_initialized) return; void slave_recheck (void)
_initialized = 0; {
thread_join(_slave_thread_id); max_interval = 0;
} }
static void create_relay_stream(char *server, int port,
char *remotemount, char *localmount, int mp3) void slave_initialize(void)
{ {
sock_t streamsock; if (slave_running)
char header[4096]; return;
connection_t *con;
http_parser_t *parser;
client_t *client;
if(!localmount) slave_running = 1;
localmount = remotemount; _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
DEBUG1("Adding source at mountpoint \"%s\"", localmount);
streamsock = sock_connect_wto(server, port, 0); void slave_shutdown(void)
if (streamsock == SOCK_ERROR) { {
WARN2("Failed to relay stream from master server, couldn't connect to http://%s:%d", server, port); relay_server *relay;
return;
} if (!slave_running)
con = create_connection(streamsock, -1, NULL);
/* At this point we may not know if we are relaying a mp3 or vorbis stream,
* so lets send in the icy-metadata header just in case, it's harmless in
* the vorbis case. If we don't send in this header then relay will not
* have mp3 metadata.
*/
sock_write(streamsock, "GET %s HTTP/1.0\r\n"
"User-Agent: " ICECAST_VERSION_STRING "\r\n"
"Icy-MetaData: 1\r\n"
"\r\n",
remotemount);
memset(header, 0, sizeof(header));
if (util_read_header(con->sock, header, 4096) == 0) {
WARN0("Header read failed");
connection_close(con);
return; return;
} slave_running = 0;
parser = httpp_create_parser(); thread_join (_slave_thread_id);
httpp_initialize(parser, NULL);
if(!httpp_parse_response(parser, header, strlen(header), localmount)) { relay = global.relays;
if(httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)) { while (relay)
ERROR1("Error parsing relay request: %s", relay = relay_free (relay);
httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)); global.relays = NULL;
relay = global.master_relays;
while (relay)
relay = relay_free (relay);
global.master_relays = NULL;
}
/* This does the actual connection for a relay. A thread is
* started off if a connection can be acquired
*/
static void start_relay_stream (relay_server *relay)
{
sock_t streamsock = SOCK_ERROR;
source_t *src = relay->source;
http_parser_t *parser = NULL;
connection_t *con=NULL;
char header[4096];
INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
do
{
streamsock = sock_connect_wto (relay->server, relay->port, 30);
if (streamsock == SOCK_ERROR)
{
WARN3("Failed to relay stream from master server, couldn't connect to http://%s:%d%s",
relay->server, relay->port, relay->mount);
break;
} }
else con = create_connection (streamsock, -1, NULL);
/* At this point we may not know if we are relaying an mp3 or vorbis
* stream, but only send the icy-metadata header if the relay details
* state so (the typical case). It's harmless in the vorbis case. If
* we don't send in this header then relay will not have mp3 metadata.
*/
sock_write(streamsock, "GET %s HTTP/1.0\r\n"
"User-Agent: " ICECAST_VERSION_STRING "\r\n"
"%s"
"\r\n",
relay->mount, relay->mp3metadata?"Icy-MetaData: 1\r\n":"");
memset (header, 0, sizeof(header));
if (util_read_header (con->sock, header, 4096) == 0)
{
WARN0("Header read failed");
break;
}
parser = httpp_create_parser();
httpp_initialize (parser, NULL);
if (! httpp_parse_response (parser, header, strlen(header), relay->localmount))
{
ERROR0("Error parsing relay request"); ERROR0("Error parsing relay request");
connection_close(con); break;
httpp_destroy(parser); }
if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
{
ERROR1("Error from relay request: %s", httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
break;
}
src->parser = parser;
src->con = con;
if (connection_complete_source (src) < 0)
{
DEBUG0("Failed to complete source initialisation");
break;
}
thread_create ("Relay Thread", _relay_thread, relay, THREAD_DETACHED);
return; return;
} while (0);
if (con == NULL && streamsock != SOCK_ERROR)
sock_close (streamsock);
if (con)
connection_close (con);
src->con = NULL;
if (parser)
httpp_destroy (parser);
src->parser = NULL;
}
/* wrapper for starting the provided relay stream */
static void check_relay_stream (relay_server *relay)
{
if (relay->source == NULL)
{
/* new relay, reserve the name */
DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
relay->source = source_reserve (relay->localmount);
}
if (relay->source && !relay->running)
{
start_relay_stream (relay);
} }
}
client = client_create(con, parser); /* go through updated looking for relays that are different configured. The
if (!connection_create_source(client, con, parser, * returned list contains relays that should be kept running, current contains
httpp_getvar(parser, HTTPP_VAR_URI))) { * the list of relays to shutdown
DEBUG0("Failed to create source"); */
client_destroy(client); static relay_server *
update_relay_set (relay_server **current, relay_server *updated)
{
relay_server *relay = updated;
relay_server *existing_relay, **existing_p;
relay_server *new_list = NULL;
while (relay)
{
existing_relay = *current;
existing_p = current;
while (existing_relay)
{
if (strcmp (relay->localmount, existing_relay->localmount) == 0)
break;
existing_p = &existing_relay->next;
existing_relay = existing_relay->next;
}
if (existing_relay == NULL)
{
/* new one, copy and insert */
existing_relay = relay_copy (relay);
}
else
{
*existing_p = existing_relay->next;
}
existing_relay->next = new_list;
new_list = existing_relay;
relay = relay->next;
} }
return new_list;
}
/* update the relay_list with entries from new_relay_list. Any new relays
* are added to the list, and any not listed in the provided new_relay_list
* get marked for shutting down, just in case they are not shutting down by
* themselves
*/
static void
update_relays (relay_server **relay_list, relay_server *new_relay_list)
{
relay_server *relay, *current;
return; current = update_relay_set (relay_list, new_relay_list);
/* ok whats left, lets make sure they shut down */
relay = *relay_list;
while (relay)
{
relay->cleanup = 1;
if (relay->source)
{
if (relay->source->running)
DEBUG1 ("requested %s to shut down", relay->source->mount);
relay->source->running = 0;
relay = relay->next;
}
else
relay = relay_free (relay);
}
/* re-assign new set */
*relay_list = current;
} }
static void *_slave_thread(void *arg) {
static int update_from_master(ice_config_t *config)
{
char *master = NULL, *password = NULL, *username= NULL;
int port;
sock_t mastersock; sock_t mastersock;
int ret = 0;
char buf[256]; char buf[256];
unsigned interval = 0; do
char *authheader, *data; {
int len; char *authheader, *data;
char *username = "relay"; relay_server *relays = NULL, *relay;
relay_server *relay; int len, count = 1;
ice_config_t *config;
while (_initialized) { username = strdup ("relay");
if (max_interval > ++interval) { if (config->master_password)
thread_sleep(1000000); password = strdup (config->master_password);
continue;
}
else {
/* In case it's been reconfigured */
config = config_get_config();
max_interval = config->master_update_interval;
interval = 0; if (config->master_server)
} master = strdup (config->master_server);
if(config->master_server != NULL) { port = config->master_server_port;
char *server = strdup (config->master_server);
int port = config->master_server_port;
char *password = NULL;
if (config->master_password != NULL)
password = strdup (config->master_password);
else
password = strdup (config->source_password);
config_release_config();
mastersock = sock_connect_wto(server, port, 0);
if (mastersock == SOCK_ERROR) {
WARN0("Relay slave failed to contact master server to fetch stream list");
free (server);
free (password);
continue;
}
len = strlen(username) + strlen(password) + 1; if (password == NULL || master == NULL || port == 0)
authheader = malloc(len+1); break;
strcpy(authheader, username); ret = 1;
strcat(authheader, ":"); config_release_config();
strcat(authheader, password); mastersock = sock_connect_wto (master, port, 0);
data = util_base64_encode(authheader);
sock_write(mastersock,
"GET /admin/streamlist.txt HTTP/1.0\r\n"
"Authorization: Basic %s\r\n"
"\r\n", data);
free(authheader);
free(data);
while (sock_read_line(mastersock, buf, sizeof(buf))) {
if(!strlen(buf))
break;
}
while (sock_read_line(mastersock, buf, sizeof(buf))) { if (mastersock == SOCK_ERROR)
avl_tree_rlock(global.source_tree); {
if (!source_find_mount(buf)) { WARN0("Relay slave failed to contact master server to fetch stream list");
avl_tree_unlock(global.source_tree); break;
}
create_relay_stream(server, port, buf, NULL, 0); len = strlen(username) + strlen(password) + 1;
} authheader = malloc(len+1);
else strcpy(authheader, username);
avl_tree_unlock(global.source_tree); strcat(authheader, ":");
strcat(authheader, password);
data = util_base64_encode(authheader);
sock_write (mastersock,
"GET /admin/streamlist.txt HTTP/1.0\r\n"
"Authorization: Basic %s\r\n"
"\r\n", data);
free(authheader);
free(data);
while (sock_read_line(mastersock, buf, sizeof(buf)))
{
if (!strlen(buf))
break;
}
while (sock_read_line(mastersock, buf, sizeof(buf)))
{
relay_server *r;
if (!strlen(buf))
continue;
DEBUG2 ("read %d from master \"%s\"", count++, buf);
r = calloc (1, sizeof (relay_server));
if (r)
{
r->server = xmlStrdup (master);
r->port = port;
r->mount = xmlStrdup (buf);
r->localmount = xmlStrdup (buf);
r->mp3metadata = 1;
r->next = relays;
relays = r;
} }
free (server);
free (password);
sock_close(mastersock);
} }
else { sock_close (mastersock);
config_release_config();
update_relays (&global.master_relays, relays);
/* start any inactive relays */
relay = global.master_relays;
while (relay)
{
check_relay_stream (relay);
relay = relay->next;
} }
relay = relays;
while (relay)
relay = relay_free (relay);
} while(0);
if (master)
free (master);
if (username)
free (username);
if (password)
free (password);
return ret;
}
static void *_slave_thread(void *arg)
{
ice_config_t *config;
relay_server *relay;
unsigned interval = 0;
/* And now, we process the individual mounts... */ while (slave_running)
{
thread_sleep (1000000);
if (max_interval > ++interval)
continue;
interval = 0;
config = config_get_config(); config = config_get_config();
relay = config->relay;
thread_mutex_lock(&(config_locks()->relay_lock));
config_release_config();
while(relay) { max_interval = config->master_update_interval;
avl_tree_rlock(global.source_tree);
if(!source_find_mount_raw(relay->localmount)) {
avl_tree_unlock(global.source_tree);
create_relay_stream(relay->server, relay->port, relay->mount, /* the connection could time some time, so the lock can drop */
relay->localmount, relay->mp3metadata); if (update_from_master (config))
} config = config_get_config();
else
avl_tree_unlock(global.source_tree); thread_mutex_lock (&(config_locks()->relay_lock));
update_relays (&global.relays, config->relay);
config_release_config();
/* start any inactive relays */
relay = global.relays;
while (relay)
{
check_relay_stream (relay);
relay = relay->next; relay = relay->next;
} }
thread_mutex_unlock (&(config_locks()->relay_lock));
thread_mutex_unlock(&(config_locks()->relay_lock));
} }
INFO0 ("Slave thread shutting down"); INFO0 ("Slave thread shutdown complete");
return NULL; return NULL;
} }