Commit b3b2695a authored by Karl Heyes's avatar Karl Heyes

Handle http 302 response when a relay starts. The socket IO is isolated into a

separate function for loop handling and log messages are updated.


svn path=/icecast/trunk/icecast/; revision=13533
parent 2b817dae
......@@ -38,6 +38,7 @@ extern int playlistlog;
#define ERROR1(y, a) log_write(errorlog, 1, CATMODULE "/", __func__, y, a)
#define ERROR2(y, a, b) log_write(errorlog, 1, CATMODULE "/", __func__, y, a, b)
#define ERROR3(y, a, b, c) log_write(errorlog, 1, CATMODULE "/", __func__, y, a, b, c)
#define ERROR4(y, a, b, c, d) log_write(errorlog, 1, CATMODULE "/", __func__, y, a, b, c, d)
#define WARN0(y) log_write(errorlog, 2, CATMODULE "/", __func__, y)
#define WARN1(y, a) log_write(errorlog, 2, CATMODULE "/", __func__, y, a)
......
......@@ -145,52 +145,52 @@ void slave_shutdown(void)
}
/* This does the actual connection for a relay. A thread is
* started off if a connection can be acquired
/* Actually open the connection and do some http parsing, handle any 302
* responses within here.
*/
static void *start_relay_stream (void *arg)
static client_t *open_relay_connection (relay_server *relay)
{
relay_server *relay = arg;
sock_t streamsock = SOCK_ERROR;
source_t *src = relay->source;
int redirects = 0;
http_parser_t *parser = NULL;
connection_t *con=NULL;
char *server = strdup (relay->server);
char *mount = strdup (relay->mount);
int port = relay->port;
char *auth_header;
char header[4096];
relay->running = 1;
INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
do
/* build any authentication header before connecting */
if (relay->username && relay->password)
{
char *auth_header;
char *esc_authorisation;
unsigned len = strlen(relay->username) + strlen(relay->password) + 2;
auth_header = malloc (len);
snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
esc_authorisation = util_base64_encode(auth_header);
free(auth_header);
len = strlen (esc_authorisation) + 24;
auth_header = malloc (len);
snprintf (auth_header, len,
"Authorization: Basic %s\r\n", esc_authorisation);
free(esc_authorisation);
}
else
auth_header = strdup ("");
streamsock = sock_connect_wto (relay->server, relay->port, 30);
while (redirects < 10)
{
sock_t streamsock;
INFO2 ("connecting to %s:%d", server, port);
streamsock = sock_connect_wto (server, port, 10);
if (streamsock == SOCK_ERROR)
{
WARN3("Failed to relay stream from master server, couldn't connect to http://%s:%d%s",
relay->server, relay->port, relay->mount);
WARN2 ("Failed to connect to %s:%d", server, port);
break;
}
con = connection_create (streamsock, -1, NULL);
if (relay->username && relay->password)
{
char *esc_authorisation;
unsigned len = strlen(relay->username) + strlen(relay->password) + 2;
auth_header = malloc (len);
snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
esc_authorisation = util_base64_encode(auth_header);
free(auth_header);
len = strlen (esc_authorisation) + 24;
auth_header = malloc (len);
snprintf (auth_header, len,
"Authorization: Basic %s\r\n", esc_authorisation);
free(esc_authorisation);
}
else
{
auth_header = strdup ("");
}
con = connection_create (streamsock, -1, strdup (server));
/* At this point we may not know if we are relaying an mp3 or vorbis
* stream, but only send the icy-metadata header if the relay details
......@@ -202,54 +202,132 @@ static void *start_relay_stream (void *arg)
"%s"
"%s"
"\r\n",
relay->mount,
mount,
ICECAST_VERSION_STRING,
relay->mp3metadata?"Icy-MetaData: 1\r\n":"",
auth_header);
free (auth_header);
memset (header, 0, sizeof(header));
if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
{
WARN0("Header read failed");
ERROR4 ("Header read failed for %s (%s:%d%s)", relay->localmount, server, port, mount);
break;
}
parser = httpp_create_parser();
httpp_initialize (parser, NULL);
if (! httpp_parse_response (parser, header, strlen(header), relay->localmount))
{
ERROR0("Error parsing relay request");
ERROR4("Error parsing relay request for %s (%s:%d%s)", relay->localmount,
server, port, mount);
break;
}
if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
{
ERROR1("Error from relay request: %s", httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
break;
}
src->parser = parser;
src->con = con;
/* better retry the connection again but with different details */
const char *uri, *mountpoint;
int len;
global_lock ();
if (client_create (&src->client, con, parser) < 0)
{
global_unlock ();
/* make sure only the client_destory frees these */
uri = httpp_getvar (parser, "location");
INFO1 ("redirect received %s", uri);
if (strncmp (uri, "http://", 7) != 0)
break;
uri += 7;
mountpoint = strchr (uri, '/');
free (mount);
if (mountpoint)
mount = strdup (mountpoint);
else
mount = strdup ("/");
len = strcspn (uri, ":/");
port = 80;
if (uri [len] == ':')
port = atoi (uri+len+1);
free (server);
server = calloc (1, len+1);
strncpy (server, uri, len);
connection_close (con);
httpp_destroy (parser);
con = NULL;
parser = NULL;
break;
}
global_unlock ();
sock_set_blocking (streamsock, SOCK_NONBLOCK);
con = NULL;
parser = NULL;
client_set_queue (src->client, NULL);
else
{
client_t *client = NULL;
if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
{
ERROR2("Error from relay request: %s (%s)", relay->localmount,
httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
break;
}
global_lock ();
if (client_create (&client, con, parser) < 0)
{
global_unlock ();
/* make sure only the client_destory frees these */
con = NULL;
parser = NULL;
client_destroy (client);
break;
}
global_unlock ();
sock_set_blocking (streamsock, SOCK_NONBLOCK);
client_set_queue (client, NULL);
free (server);
free (mount);
free (auth_header);
return client;
}
redirects++;
}
/* failed, better clean up */
free (server);
free (mount);
free (auth_header);
if (con)
connection_close (con);
if (parser)
httpp_destroy (parser);
return NULL;
}
/* This does the actual connection for a relay. A thread is
* started off if a connection can be acquired
*/
static void *start_relay_stream (void *arg)
{
relay_server *relay = arg;
source_t *src = relay->source;
client_t *client;
ice_config_t *config;
INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
do
{
client = open_relay_connection (relay);
if (client == NULL)
continue;
src->client = client;
src->parser = client->parser;
src->con = client->con;
if (connection_complete_source (src, 0) < 0)
{
DEBUG0("Failed to complete source initialisation");
break;
INFO0("Failed to complete source initialisation");
client_destroy (client);
src->client = NULL;
continue;
}
stats_event_inc(NULL, "source_relay_connections");
stats_event (relay->localmount, "source_ip", relay->server);
stats_event (relay->localmount, "source_ip", client->con->ip);
config = config_get_config();
stats_event_args (relay->localmount, "listenurl", "http://%s:%d%s",
config->hostname, config->port, relay->localmount);
config_release_config();
source_main (relay->source);
......@@ -265,7 +343,7 @@ static void *start_relay_stream (void *arg)
relay->cleanup = 1;
return NULL;
} while (0);
} while (0); /* TODO allow looping through multiple servers */
if (relay->source->fallback_mount)
{
......@@ -281,12 +359,6 @@ static void *start_relay_stream (void *arg)
avl_tree_unlock (global.source_tree);
}
if (con)
connection_close (con);
src->con = NULL;
if (parser)
httpp_destroy (parser);
src->parser = NULL;
source_clear_source (relay->source);
/* cleanup relay, but prevent this relay from starting up again too soon */
......@@ -331,7 +403,6 @@ static void check_relay_stream (relay_server *relay)
if (source->fallback_mount && source->fallback_override)
{
source_t *fallback;
DEBUG1 ("checking %s for fallback override", source->fallback_mount);
avl_tree_rlock (global.source_tree);
fallback = source_find_mount (source->fallback_mount);
if (fallback && fallback->running && fallback->listeners)
......@@ -346,6 +417,7 @@ static void check_relay_stream (relay_server *relay)
}
relay->start = time(NULL) + 5;
relay->running = 1;
relay->thread = thread_create ("Relay Thread", start_relay_stream,
relay, THREAD_ATTACHED);
return;
......@@ -363,7 +435,7 @@ static void check_relay_stream (relay_server *relay)
relay->cleanup = 0;
relay->running = 0;
if (relay->on_demand)
if (relay->on_demand && relay->source)
{
ice_config_t *config = config_get_config ();
mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
......@@ -472,6 +544,7 @@ static void relay_check_streams (relay_server *to_start,
{
/* relay has been removed from xml, shut down active relay */
DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
to_free->running = 0;
to_free->source->running = 0;
thread_join (to_free->thread);
slave_rebuild_mounts();
......@@ -654,7 +727,7 @@ static void *_slave_thread(void *arg)
source_recheck_mounts();
}
}
DEBUG0 ("shutting down current relays");
INFO0 ("shutting down current relays");
relay_check_streams (NULL, global.relays, 0);
relay_check_streams (NULL, global.master_relays, 0);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment