Commit ad1e6c41 authored by Karl Heyes's avatar Karl Heyes

merge in the on-demand relay implementation.

svn path=/icecast/trunk/icecast/; revision=9406
parent 5d9fe0c5
......@@ -210,6 +210,7 @@ certain formats.
<master-update-interval>120</master-update-interval>
<master-username>relay</master-username>
<master-password>hackme</master-password>
<relays-on-demand>0</relays-on-demand>
<relay>
<server>127.0.0.1</server>
......@@ -219,6 +220,7 @@ certain formats.
<username>joe</username>
<password>soap</password>
<relay-shoutcast-metadata>0</relay-shoutcast-metadata>
<on-demand>0</on-demand>
</relay>
</pre>
<p>This section contains the server's relay settings. There are two types of relays: a "Master server relay" or a "Specific Mountpoint relay." A Master server relay is only supported between icecast2 servers and is used to relays all mountpoints on a remote icecast2 server.
......@@ -268,6 +270,13 @@ used
This is the relay password on the Master server. It is used to query the
server for a list of mountpoints to relay.
</div>
<h4>relays-on-demand</h4>
<div class="indentedbox">
<p>Changes the default on-demand setting for relays, so a stream is only relayed if
listeners are connected. 1=enabled, 0=disabled (default).
</p>
</div>
<br />
<h3>Specific Mountpoint Relay</h3>
The following diagram shows the basics of doing a Specific Mountpoint relay. Note that Server 1 is configured with the &lt;relay&gt; settings and Server 2 is the server from which Server 1 will pull the specified mountpoint(s) and relay them. Using a Specific Mountpoint Relay, only those mountpoints specified on Server 1 will be relayed from Server 2.
......@@ -299,6 +308,7 @@ A server is configured as a Specific Mountpoint Server relay by specifying a &lt
&lt;username&gt;joe&lt;/username&gt;
&lt;password&gt;soap&lt;/password&gt;
&lt;relay-shoutcast-metadata&gt;0&lt;/relay-shoutcast-metadata&gt;
&lt;on-demand&gt;1&lt;/on-demand&gt;
&lt;/relay&gt;
</pre>
......@@ -331,6 +341,13 @@ The source of the relay may require authentication itself, if so state the passw
<div class="indentedbox">
If you are relaying a Shoutcast stream, you need to specify this indicator to also relay the metadata (song titles) that is part of the Shoutcast stream (1=enabled, 0=disabled).
</div>
<h4>on-demand</h4>
<div class="indentedbox">
<p>An on-demand relay will only retrieve the stream if there are listeners connected
1=enabled, 0=disabled (default is &lt;relays-on-demand&gt;).
</p>
</div>
<p>
<br />
<br />
......@@ -405,7 +422,6 @@ An optional value which will set the filename which will be a dump of the stream
being opened.
</p>
</div>
<h4>fallback-mount</h4>
<div class="indentedbox">
This optional value specifies a mountpoint that clients are automatically moved to if the source
......@@ -492,10 +508,11 @@ as defined in limits. The value is in bytes.
</div>
<h4>mp3-metadata-interval</h4>
<div class="indentedbox">
This optional setting specifies what interval, in bytes, there is between metadata updates within
shoutcast compatible streams. This only applies to new listeners connecting on this mountpoint,
not existing listeners falling back to this mountpoint. The default is either the hardcoded
server default or the value passed from a relay.
<p>This optional setting specifies what interval, in bytes, there is between metadata
updates within shoutcast compatible streams. This only applies to new listeners connecting
on this mountpoint, not existing listeners falling back to this mountpoint. The default
is either the hardcoded server default or the value passed from a relay.
</p>
</div>
<h4>hidden</h4>
<div class="indentedbox">
......
......@@ -226,7 +226,7 @@ xmlDocPtr admin_build_sourcelist (const char *mount)
continue;
}
if (source->running)
if (source->running || source->on_demand)
{
srcnode = xmlNewChild(xmlnode, NULL, "source", NULL);
xmlSetProp(srcnode, "mount", source->mount);
......@@ -369,7 +369,7 @@ void admin_handle_request(client_t *client, char *uri)
}
else
{
if (source->running == 0)
if (source->running == 0 && source->on_demand == 0)
{
avl_tree_unlock (global.source_tree);
INFO2("Received admin command %s on unavailable mount \"%s\"",
......@@ -590,7 +590,7 @@ static void command_move_clients(client_t *client, source_t *source,
return;
}
if (dest->running == 0)
if (dest->running == 0 && dest->on_demand == 0)
{
client_send_400 (client, "Destination not running");
return;
......@@ -982,7 +982,7 @@ static void command_list_mounts(client_t *client, int response)
if (source == NULL)
continue;
if (source->running == 0)
if (source->running == 0 && source->on_demand == 0)
continue;
if (source->hidden)
continue;
......
......@@ -342,6 +342,7 @@ static void _set_defaults(ice_config_t *configuration)
configuration->ice_login = CONFIG_DEFAULT_ICE_LOGIN;
configuration->fileserve = CONFIG_DEFAULT_FILESERVE;
configuration->touch_interval = CONFIG_DEFAULT_TOUCH_FREQ;
configuration->on_demand = 0;
configuration->dir_list = NULL;
configuration->hostname = CONFIG_DEFAULT_HOSTNAME;
configuration->port = 0;
......@@ -408,6 +409,10 @@ static void _parse_root(xmlDocPtr doc, xmlNodePtr node,
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
configuration->fileserve = atoi(tmp);
if (tmp) xmlFree(tmp);
} else if (strcmp(node->name, "relays-on-demand") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
configuration->on_demand = atoi(tmp);
if (tmp) xmlFree(tmp);
} else if (strcmp(node->name, "hostname") == 0) {
if (configuration->hostname && configuration->hostname != CONFIG_DEFAULT_HOSTNAME) xmlFree(configuration->hostname);
configuration->hostname = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
......@@ -697,6 +702,7 @@ static void _parse_relay(xmlDocPtr doc, xmlNodePtr node,
relay->next = NULL;
relay->mp3metadata = 1;
relay->on_demand = configuration->on_demand;
do {
if (node == NULL) break;
......@@ -732,6 +738,11 @@ static void _parse_relay(xmlDocPtr doc, xmlNodePtr node,
relay->password = (char *)xmlNodeListGetString(doc,
node->xmlChildrenNode, 1);
}
else if (strcmp(node->name, "on-demand") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
relay->on_demand = atoi(tmp);
if (tmp) xmlFree(tmp);
}
} while ((node = node->next));
if (relay->localmount == NULL)
relay->localmount = xmlStrdup (relay->mount);
......
......@@ -109,6 +109,7 @@ typedef struct ice_config_tag
int source_timeout;
int ice_login;
int fileserve;
int on_demand; /* global setting for all relays */
char *shoutcast_mount;
char *source_password;
......
......@@ -896,7 +896,7 @@ static void _handle_get_request (client_t *client, char *passed_uri)
if (uri != passed_uri) free (uri);
return;
}
if (source->running == 0)
if (source->running == 0 && source->on_demand == 0)
{
avl_tree_unlock(global.source_tree);
DEBUG0("inactive source, client dropped");
......@@ -953,6 +953,14 @@ static void _handle_get_request (client_t *client, char *passed_uri)
avl_tree_wlock(source->pending_tree);
avl_insert(source->pending_tree, (void *)client);
avl_tree_unlock(source->pending_tree);
if (source->running == 0 && source->on_demand)
{
/* enable on-demand relay to start, wake up the slave thread */
DEBUG0("kicking off on-demand relay");
source->on_demand_req = 1;
slave_rescan ();
}
}
avl_tree_unlock(global.source_tree);
......
......@@ -114,6 +114,7 @@ static void find_client_start (source_t *source, client_t *client)
{
client_set_queue (client, refbuf);
client->check_buffer = format_advance_queue;
client->write_to_client = source->format->write_buf_to_client;
client->intro_offset = -1;
break;
}
......
......@@ -99,6 +99,7 @@ relay_server *relay_copy (relay_server *r)
copy->password = xmlStrdup (r->password);
copy->port = r->port;
copy->mp3metadata = r->mp3metadata;
copy->on_demand = r->on_demand;
}
return copy;
}
......@@ -114,6 +115,15 @@ void slave_recheck_mounts (void)
}
/* Request slave thread to rescan the existing relays to see if any need
* starting up, eg on-demand relays
*/
void slave_rescan (void)
{
rescan_relays = 1;
}
/* Request slave thread to check the relay list for changes and to
* update the stats for the current streams.
*/
......@@ -235,6 +245,13 @@ static void *start_relay_stream (void *arg)
source_main (relay->source);
if (relay->on_demand == 0)
{
/* only keep refreshing YP entries for inactive on-demand relays */
yp_remove (relay->localmount);
relay->source->yp_public = -1;
}
/* initiate an immediate relay cleanup run */
relay->cleanup = 1;
rescan_relays = 1;
......@@ -242,6 +259,20 @@ static void *start_relay_stream (void *arg)
return NULL;
} while (0);
DEBUG1 ("failed relay, fallback to %s", relay->source->fallback_mount);
if (relay->source->fallback_mount)
{
source_t *fallback_source;
avl_tree_rlock(global.source_tree);
fallback_source = source_find_mount (relay->source->fallback_mount);
if (fallback_source != NULL)
source_move_clients (relay->source, fallback_source);
avl_tree_unlock (global.source_tree);
}
if (con == NULL && streamsock != SOCK_ERROR)
sock_close (streamsock);
if (con)
......@@ -278,13 +309,46 @@ static void check_relay_stream (relay_server *relay)
else
WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
}
if (relay->source && !relay->running)
do
{
source_t *source = relay->source;
if (relay->source == NULL || relay->running)
break;
if (relay->on_demand)
{
ice_config_t *config = config_get_config ();
mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
if (mountinfo == NULL)
source_update_settings (config, relay->source, mountinfo);
config_release_config ();
slave_rebuild_mounts();
stats_event (relay->localmount, "listeners", "0");
relay->source->on_demand = relay->on_demand;
if (source->fallback_mount && source->fallback_override)
{
source_t *fallback;
DEBUG1 ("checking %s for fallback override", source->fallback_mount);
avl_tree_rlock (global.source_tree);
fallback = source_find_mount (source->fallback_mount);
if (fallback && fallback->running && fallback->listeners)
{
DEBUG2 ("fallback running %d with %lu listeners", fallback->running, fallback->listeners);
source->on_demand_req = 1;
}
avl_tree_unlock (global.source_tree);
}
if (source->on_demand_req == 0)
break;
}
relay->thread = thread_create ("Relay Thread", start_relay_stream,
relay, THREAD_ATTACHED);
return;
}
/* the relay thread may of close down */
} while(0);
/* the relay thread may of shut down itself */
if (relay->cleanup && relay->thread)
{
DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
......@@ -292,6 +356,15 @@ static void check_relay_stream (relay_server *relay)
relay->thread = NULL;
relay->cleanup = 0;
relay->running = 0;
if (relay->on_demand)
{
ice_config_t *config = config_get_config ();
mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
source_update_settings (config, relay->source, mountinfo);
config_release_config ();
stats_event (relay->localmount, "listeners", "0");
}
}
}
......@@ -311,6 +384,8 @@ static int relay_has_changed (relay_server *new, relay_server *old)
break;
if (new->mp3metadata != old->mp3metadata)
break;
if (new->on_demand != old->on_demand)
old->on_demand = new->on_demand;
return 0;
} while (0);
return 1;
......@@ -421,6 +496,7 @@ static int update_from_master(ice_config_t *config)
char *authheader, *data;
relay_server *new_relays = NULL, *cleanup_relays;
int len, count = 1;
int on_demand;
username = strdup (config->master_username);
if (config->master_password)
......@@ -433,6 +509,7 @@ static int update_from_master(ice_config_t *config)
if (password == NULL || master == NULL || port == 0)
break;
on_demand = config->on_demand;
ret = 1;
config_release_config();
mastersock = sock_connect_wto (master, port, 0);
......@@ -481,6 +558,7 @@ static int update_from_master(ice_config_t *config)
r->mount = xmlStrdup (buf);
r->localmount = xmlStrdup (buf);
r->mp3metadata = 1;
r->on_demand = on_demand;
r->next = new_relays;
new_relays = r;
}
......
......@@ -24,6 +24,7 @@ typedef struct _relay_server {
char *localmount;
struct source_tag *source;
int mp3metadata;
int on_demand;
int running;
int cleanup;
thread_type *thread;
......@@ -35,6 +36,7 @@ void slave_initialize(void);
void slave_shutdown(void);
void slave_recheck_mounts (void);
void slave_rebuild_mounts (void);
void slave_rescan (void);
relay_server *relay_free (relay_server *relay);
#endif /* __SLAVE_H__ */
......@@ -152,8 +152,11 @@ source_t *source_find_mount (const char *mount)
{
source = source_find_mount_raw(mount);
if (source != NULL && source->running)
break;
if (source)
{
if (source->running || source->on_demand)
break;
}
/* we either have a source which is not active (relay) or no source
* at all. Check the mounts list for fallback settings
......@@ -229,9 +232,6 @@ void source_clear_source (source_t *source)
}
source->stream_data_tail = NULL;
if (source->yp_public)
yp_remove (source->mount);
source->burst_point = NULL;
source->burst_size = 0;
source->burst_offset = 0;
......@@ -257,6 +257,8 @@ void source_clear_source (source_t *source)
fclose (source->intro_file);
source->intro_file = NULL;
}
source->on_demand_req = 0;
}
......@@ -271,6 +273,9 @@ void source_free_source (source_t *source)
avl_tree_free(source->pending_tree, _free_client);
avl_tree_free(source->client_tree, _free_client);
/* make sure all YP entries have gone */
yp_remove (source->mount);
free (source->mount);
free (source);
......@@ -306,6 +311,7 @@ client_t *source_find_client(source_t *source, int id)
*/
void source_move_clients (source_t *source, source_t *dest)
{
unsigned long count = 0;
if (strcmp (source->mount, dest->mount) == 0)
{
WARN1 ("src and dst are the same \"%s\", skipping", source->mount);
......@@ -316,7 +322,7 @@ void source_move_clients (source_t *source, source_t *dest)
/* if the destination is not running then we can't move clients */
if (dest->running == 0)
if (dest->running == 0 && dest->on_demand == 0)
{
WARN1 ("destination mount %s not running, unable to move clients ", dest->mount);
thread_mutex_unlock (&move_clients_mutex);
......@@ -331,15 +337,18 @@ void source_move_clients (source_t *source, source_t *dest)
/* we need to move the client and pending trees */
avl_tree_wlock (source->pending_tree);
if (source->format == NULL)
if (source->on_demand == 0 && source->format == NULL)
{
INFO1 ("source mount %s is not available", source->mount);
break;
}
if (source->format->type != dest->format->type)
if (source->format && dest->format)
{
WARN2 ("stream %s and %s are of different types, ignored", source->mount, dest->mount);
break;
if (source->format->type != dest->format->type)
{
WARN2 ("stream %s and %s are of different types, ignored", source->mount, dest->mount);
break;
}
}
while (1)
......@@ -350,10 +359,18 @@ void source_move_clients (source_t *source, source_t *dest)
client = (client_t *)(node->key);
avl_delete (source->pending_tree, client, NULL);
/* switch client to different queue */
client_set_queue (client, NULL);
client->check_buffer = format_check_file_buffer;
/* when switching a client to a different queue, be wary of the
* refbuf it's referring to, if it's http headers then we need
* to write them so don't release it.
*/
if (client->check_buffer != format_check_http_buffer)
{
client_set_queue (client, NULL);
client->check_buffer = format_check_file_buffer;
}
avl_insert (dest->pending_tree, (void *)client);
count++;
}
avl_tree_wlock (source->client_tree);
......@@ -366,17 +383,33 @@ void source_move_clients (source_t *source, source_t *dest)
client = (client_t *)(node->key);
avl_delete (source->client_tree, client, NULL);
/* switch client to different queue */
client_set_queue (client, NULL);
client->check_buffer = format_check_file_buffer;
/* when switching a client to a different queue, be wary of the
* refbuf it's referring to, if it's http headers then we need
* to write them so don't release it.
*/
if (client->check_buffer != format_check_http_buffer)
{
client_set_queue (client, NULL);
client->check_buffer = format_check_file_buffer;
}
avl_insert (dest->pending_tree, (void *)client);
count++;
}
INFO2 ("passing %lu listeners to \"%s\"", count, dest->mount);
source->listeners = 0;
stats_event (source->mount, "listeners", "0");
avl_tree_unlock (source->client_tree);
} while (0);
/* see if we need to wake up an on-demand relay */
if (dest->running == 0 && dest->on_demand && count)
{
dest->on_demand_req = 1;
slave_rebuild_mounts();
}
avl_tree_unlock (source->pending_tree);
avl_tree_unlock (dest->pending_tree);
thread_mutex_unlock (&move_clients_mutex);
......@@ -695,6 +728,8 @@ void source_main (source_t *source)
{
INFO2("listener count on %s now %lu", source->mount, source->listeners);
stats_event_args (source->mount, "listeners", "%lu", source->listeners);
if (source->listeners == 0 && source->on_demand)
source->running = 0;
}
/* lets reduce the queue, any lagging clients should of been
......@@ -1051,6 +1086,14 @@ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy
DEBUG1 ("intro file is %s", mountinfo->intro_filename);
if (source->dumpfilename)
DEBUG1 ("Dumping stream to %s", source->dumpfilename);
if (source->on_demand)
{
DEBUG0 ("on_demand set");
stats_event (source->mount, "on_demand", "1");
}
else
stats_event (source->mount, "on_demand", NULL);
if (source->hidden)
{
stats_event_hidden (source->mount, NULL, 1);
......
......@@ -68,6 +68,8 @@ typedef struct source_tag
unsigned int queue_size_limit;
unsigned timeout; /* source timeout in seconds */
int on_demand;
int on_demand_req;
int hidden;
time_t last_read;
int short_delay;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment