Commit 55ba59f3 authored by Karl Heyes's avatar Karl Heyes

Update of the YP code. This should resolve several YP issues that

have been reported, the main one being icecast instability when
there is a YP server outage.


svn path=/icecast/trunk/icecast/; revision=6646
parent dc990351
......@@ -92,7 +92,7 @@ then
XIPH_PATH_CURL([
AC_CHECK_DECL([CURLOPT_NOSIGNAL],
[ AC_DEFINE([USE_YP], 1, [Define to compile in YP support code])
ICECAST_OPTIONAL="$ICECAST_OPTIONAL geturl.o yp.o"
ICECAST_OPTIONAL="$ICECAST_OPTIONAL yp.o"
XIPH_VAR_APPEND([XIPH_CPPFLAGS],[$CURL_CFLAGS])
XIPH_VAR_PREPEND([XIPH_LIBS],[$CURL_LIBS])
], [ AC_MSG_NOTICE([Your curl dev files are too old (7.10 or above required), YP disabled])
......
......@@ -8,11 +8,11 @@ bin_PROGRAMS = icecast
noinst_HEADERS = admin.h cfgfile.h os.h logging.h sighandler.h connection.h global.h\
util.h slave.h source.h stats.h refbuf.h client.h format.h format_vorbis.h\
compat.h format_mp3.h fserve.h xslt.h geturl.h yp.h event.h auth.h md5.h
compat.h format_mp3.h fserve.h xslt.h yp.h event.h auth.h md5.h
icecast_SOURCES = cfgfile.c main.c logging.c sighandler.c connection.c global.c\
util.c slave.c source.c stats.c refbuf.c client.c format.c format_vorbis.c\
format_mp3.c xslt.c fserve.c event.c admin.c auth.c md5.c
EXTRA_icecast_SOURCES = geturl.c yp.c
EXTRA_icecast_SOURCES = yp.c
icecast_DEPENDENCIES = @ICECAST_OPTIONAL@ net/libicenet.la thread/libicethread.la \
httpp/libicehttpp.la log/libicelog.la avl/libiceavl.la timing/libicetiming.la
......
......@@ -779,10 +779,6 @@ static void command_metadata(client_t *client, source_t *source)
char *action;
char *value;
mp3_state *state;
#ifdef USE_YP
int i;
time_t current_time;
#endif
DEBUG0("Got metadata update request");
......@@ -813,15 +809,9 @@ static void command_metadata(client_t *client, source_t *source)
source->mount, value);
stats_event(source->mount, "title", value);
#ifdef USE_YP
/* If we get an update on the mountpoint, force a
yp touch */
current_time = time(NULL);
for (i=0; i<source->num_yp_directories; i++) {
source->ypdata[i]->yp_last_touch = current_time -
source->ypdata[i]->yp_touch_interval + 2;
}
#endif
yp_touch (source->mount);
html_success(client, "Metadata update successful");
}
......
......@@ -357,6 +357,7 @@ static int format_mp3_get_buffer(format_plugin_t *self, char *data,
state->metadata_buffer = NULL;
state->metadata_age++;
thread_mutex_unlock(&(state->lock));
yp_touch (self->mount);
}
state->offset = 0;
......
......@@ -122,9 +122,6 @@ int format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long le
refbuf_t *refbuf, *source_refbuf;
vstate_t *state = (vstate_t *)self->_state;
source_t *source;
#ifdef USE_YP
time_t current_time;
#endif
if (data) {
/* write the data to the buffer */
......@@ -199,21 +196,7 @@ int format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long le
}
thread_mutex_unlock(&source->queue_mutex);
#ifdef USE_YP
/* If we get an update on the mountpoint, force a
yp touch */
if (source) {
/* If we get an update on the mountpoint, force a
yp touch */
current_time = time(NULL);
for (i=0; i<source->num_yp_directories; i++) {
source->ypdata[i]->yp_last_touch = current_time -
source->ypdata[i]->yp_touch_interval + 2;
}
}
#endif
yp_touch (self->mount);
}
}
......
/* Icecast
*
* This program is distributed under the GNU General Public License, version 2.
* A copy of this license is included with this source.
*
* Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
* Michael Smith <msmith@xiph.org>,
* oddsock <oddsock@xiph.org>,
* Karl Heyes <karl@xiph.org>
* and others (see AUTHORS for details).
*/
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <thread/thread.h>
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "logging.h"
#include "format.h"
#include "geturl.h"
#include "source.h"
#include "cfgfile.h"
#include <curl/curl.h>
#include <curl/types.h>
#include <curl/easy.h>
#define CATMODULE "geturl"
static curl_connection curl_connections[NUM_CONNECTIONS];
static mutex_t _curl_mutex;
size_t curl_write_memory_callback(void *ptr, size_t size,
size_t nmemb, void *data)
{
register int realsize = size * nmemb;
struct curl_memory_struct *mem = (struct curl_memory_struct *)data;
if ((realsize + mem->size) < YP_RESPONSE_SIZE-1) {
strncat(mem->memory, ptr, realsize);
}
return realsize;
}
size_t curl_header_memory_callback(void *ptr, size_t size,
size_t nmemb, void *data)
{
char *p1 = 0;
char *p2 = 0;
int copylen = 0;
register int realsize = size * nmemb;
struct curl_memory_struct2 *mem = (struct curl_memory_struct2 *)data;
if (!strncmp(ptr, "SID: ", strlen("SID: "))) {
p1 = (char *)ptr + strlen("SID: ");
p2 = strchr((const char *)p1, '\r');
memset(mem->sid, '\000', sizeof(mem->sid));
if (p2) {
if (p2-p1 > sizeof(mem->sid)-1) {
copylen = sizeof(mem->sid)-1;
}
else {
copylen = p2-p1;
}
strncpy(mem->sid, p1, copylen);
}
else {
strncpy(mem->sid, p1, sizeof(mem->sid)-1);
}
}
if (!strncmp(ptr, "YPMessage: ", strlen("YPMessage: "))) {
p1 = (char *)ptr + strlen("YPMessage: ");
p2 = strchr((const char *)p1, '\r');
memset(mem->message, '\000', sizeof(mem->message));
if (p2) {
if (p2-p1 > sizeof(mem->message)-1) {
copylen = sizeof(mem->message)-1;
}
else {
copylen = p2-p1;
}
strncpy(mem->message, p1, copylen);
}
else {
strncpy(mem->message, p1, sizeof(mem->message)-1);
}
}
if (!strncmp(ptr, "TouchFreq: ", strlen("TouchFreq: "))) {
p1 = (char *)ptr + strlen("TouchFreq: ");
mem->touch_interval = atoi(p1);
}
if (!strncmp(ptr, "YPResponse: ", strlen("YPResponse: "))) {
p1 = (char *)ptr + strlen("YPResponse: ");
mem->response = atoi(p1);
}
return realsize;
}
int curl_initialize()
{
int i = 0;
thread_mutex_create(&_curl_mutex);
memset(&curl_connections, 0, sizeof(curl_connections));
for (i=0; i<NUM_CONNECTIONS; i++) {
curl_connections[i].curl_handle = curl_easy_init();
curl_easy_setopt(curl_connections[i].curl_handle,
CURLOPT_WRITEFUNCTION, curl_write_memory_callback);
curl_easy_setopt(curl_connections[i].curl_handle,
CURLOPT_WRITEHEADER,
(void *)&(curl_connections[i].header_result));
curl_easy_setopt(curl_connections[i].curl_handle,
CURLOPT_HEADERFUNCTION, curl_header_memory_callback);
curl_easy_setopt(curl_connections[i].curl_handle,
CURLOPT_FILE, (void *)&(curl_connections[i].result));
}
return(1);
}
void curl_shutdown()
{
int i = 0;
for (i=0; i<NUM_CONNECTIONS; i++) {
curl_easy_cleanup(curl_connections[i].curl_handle);
memset(&(curl_connections[i]), 0, sizeof(curl_connections[i]));
}
curl_global_cleanup();
}
int curl_get_connection()
{
int found = 0;
int curl_connection = -1;
int i = 0;
while (!found) {
thread_mutex_lock(&_curl_mutex);
for (i=0; i<NUM_CONNECTIONS; i++) {
if (!curl_connections[i].in_use) {
found = 1;
curl_connections[i].in_use = 1;
curl_connection = i;
break;
}
}
thread_mutex_unlock(&_curl_mutex);
#ifdef WIN32
Sleep(200);
#else
usleep(200);
#endif
}
return(curl_connection);
}
int curl_release_connection(int which)
{
thread_mutex_lock(&_curl_mutex);
curl_connections[which].in_use = 0;
memset(&(curl_connections[which].result), 0,
sizeof(curl_connections[which].result));
memset(&(curl_connections[which].header_result), 0,
sizeof(curl_connections[which].header_result));
thread_mutex_unlock(&_curl_mutex);
return 1;
}
void curl_print_header_result(struct curl_memory_struct2 *mem) {
DEBUG1("SID -> (%s)", mem->sid);
DEBUG1("Message -> (%s)", mem->message);
DEBUG1("Touch Freq -> (%d)", mem->touch_interval);
DEBUG1("Response -> (%d)", mem->response);
}
CURL *curl_get_handle(int which)
{
return curl_connections[which].curl_handle;
}
struct curl_memory_struct *curl_get_result(int which)
{
return &(curl_connections[which].result);
}
struct curl_memory_struct2 *curl_get_header_result(int which)
{
return &(curl_connections[which].header_result);
}
/* Icecast
*
* This program is distributed under the GNU General Public License, version 2.
* A copy of this license is included with this source.
*
* Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
* Michael Smith <msmith@xiph.org>,
* oddsock <oddsock@xiph.org>,
* Karl Heyes <karl@xiph.org>
* and others (see AUTHORS for details).
*/
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
#ifndef __GETURL_H__
#define __GETURL_H__
#include <stdio.h>
#include <curl/curl.h>
#include <curl/types.h>
#include <curl/easy.h>
#define NUM_CONNECTIONS 10
#define NAK 0
#define ACK 1
#define YP_RESPONSE_SIZE 2046
#define YP_SID_SIZE 255
struct curl_memory_struct {
char memory[YP_RESPONSE_SIZE];
size_t size;
};
struct curl_memory_struct2 {
char sid[YP_SID_SIZE];
char message[YP_RESPONSE_SIZE];
int touch_interval;
int response;
size_t size;
};
typedef struct tag_curl_connection {
struct curl_memory_struct result;
struct curl_memory_struct2 header_result;
CURL *curl_handle;
int in_use;
} curl_connection;
int curl_initialize();
void curl_shutdown();
CURL *curl_get_handle(int which);
struct curl_memory_struct *curl_get_result(int which);
struct curl_memory_struct2 *curl_get_header_result(int which);
void curl_print_header_result(struct curl_memory_struct2 *mem);
int curl_get_connection();
int curl_release_connection(int which);
#endif
......@@ -48,10 +48,7 @@
#include "logging.h"
#include "xslt.h"
#include "fserve.h"
#ifdef USE_YP
#include "geturl.h"
#include "yp.h"
#endif
#include <libxml/xmlmemory.h>
......@@ -103,9 +100,6 @@ static void _initialize_subsystems(void)
global_initialize();
refbuf_initialize();
xslt_initialize();
#ifdef USE_YP
curl_initialize();
#endif
}
static void _shutdown_subsystems(void)
......@@ -114,10 +108,8 @@ static void _shutdown_subsystems(void)
xslt_shutdown();
refbuf_shutdown();
slave_shutdown();
yp_shutdown();
stats_shutdown();
#ifdef USE_YP
curl_shutdown();
#endif
/* Now that these are done, we can stop the loggers. */
_stop_logging();
......@@ -478,10 +470,8 @@ int main(int argc, char **argv)
/* let her rip */
global.running = ICE_RUNNING;
#ifdef USE_YP
/* Startup yp thread */
yp_initialize();
#endif
/* Do this after logging init */
slave_initialize();
......
......@@ -45,9 +45,6 @@
#include "logging.h"
#include "cfgfile.h"
#include "util.h"
#ifdef USE_YP
#include "geturl.h"
#endif
#include "source.h"
#include "format.h"
#include "auth.h"
......@@ -62,7 +59,7 @@ mutex_t move_clients_mutex;
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
static int _parse_audio_info(source_t *source, char *s);
static void _parse_audio_info (source_t *source, const char *s);
/* Allocate a new source with the stated mountpoint, if one already
* exists with that mountpoint in the global source tree then return
......@@ -195,9 +192,6 @@ int source_compare_sources(void *arg, void *a, void *b)
void source_clear_source (source_t *source)
{
refbuf_t *refbuf;
#ifdef USE_YP
int i;
#endif
DEBUG1 ("clearing source \"%s\"", source->mount);
client_destroy(source->client);
source->client = NULL;
......@@ -233,17 +227,9 @@ void source_clear_source (source_t *source)
source->format->free_plugin (source->format);
}
source->format = NULL;
#ifdef USE_YP
for (i=0; i<source->num_yp_directories; i++)
{
yp_destroy_ypdata(source->ypdata[i]);
source->ypdata[i] = NULL;
}
source->num_yp_directories = 0;
if (source->yp_public)
yp_remove (source->mount);
util_dict_free (source->audio_info);
source->audio_info = NULL;
#endif
source->queue_size_limit = 0;
source->listeners = 0;
source->no_mount = 0;
......@@ -380,108 +366,8 @@ void source_move_clients (source_t *source, source_t *dest)
static void source_init (source_t *source)
{
ice_config_t *config = config_get_config();
char *listenurl;
char *listenurl, *str;
int listen_url_size;
#ifdef USE_YP
char *s;
time_t current_time;
int i;
char *ai;
for (i=0;i<config->num_yp_directories;i++) {
if (config->yp_url[i]) {
source->ypdata[source->num_yp_directories] = yp_create_ypdata();
source->ypdata[source->num_yp_directories]->yp_url =
strdup (config->yp_url[i]);
source->ypdata[source->num_yp_directories]->yp_url_timeout =
config->yp_url_timeout[i];
source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
source->num_yp_directories++;
}
}
source->audio_info = util_dict_new();
/* ice-* is icecast, icy-* is shoutcast */
if ((s = httpp_getvar(source->parser, "ice-url"))) {
add_yp_info(source, "server_url", s, YP_SERVER_URL);
}
if ((s = httpp_getvar(source->parser, "ice-name"))) {
add_yp_info(source, "server_name", s, YP_SERVER_NAME);
}
if ((s = httpp_getvar(source->parser, "icy-name"))) {
add_yp_info(source, "server_name", s, YP_SERVER_NAME);
}
if ((s = httpp_getvar(source->parser, "ice-url"))) {
add_yp_info(source, "server_url", s, YP_SERVER_URL);
}
if ((s = httpp_getvar(source->parser, "icy-url"))) {
add_yp_info(source, "server_url", s, YP_SERVER_URL);
}
if ((s = httpp_getvar(source->parser, "ice-genre"))) {
add_yp_info(source, "genre", s, YP_SERVER_GENRE);
}
if ((s = httpp_getvar(source->parser, "icy-genre"))) {
add_yp_info(source, "genre", s, YP_SERVER_GENRE);
}
if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
add_yp_info(source, "bitrate", s, YP_BITRATE);
}
if ((s = httpp_getvar(source->parser, "icy-br"))) {
add_yp_info(source, "bitrate", s, YP_BITRATE);
}
if ((s = httpp_getvar(source->parser, "ice-description"))) {
add_yp_info(source, "server_description", s, YP_SERVER_DESC);
}
if ((s = httpp_getvar(source->parser, "ice-public"))) {
stats_event(source->mount, "public", s);
source->yp_public = atoi(s);
}
if ((s = httpp_getvar(source->parser, "icy-pub"))) {
stats_event(source->mount, "public", s);
source->yp_public = atoi(s);
}
if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
stats_event(source->mount, "audio_info", s);
if (_parse_audio_info(source, s)) {
ai = util_dict_urlencode(source->audio_info, '&');
add_yp_info(source, "audio_info",
ai,
YP_AUDIO_INFO);
if (ai) {
free(ai);
}
}
}
for (i=0;i<source->num_yp_directories;i++) {
add_yp_info(source, "server_type",
source->format->format_description,
YP_SERVER_TYPE);
if (source->ypdata[i]->listen_url) {
free(source->ypdata[i]->listen_url);
}
/* 6 for max size of port */
listen_url_size = strlen("http://") + strlen(config->hostname) +
strlen(":") + 6 + strlen (source->mount) + 1;
source->ypdata[i]->listen_url = malloc (listen_url_size);
sprintf (source->ypdata[i]->listen_url, "http://%s:%d%s",
config->hostname, config->port, source->mount);
}
if(source->yp_public) {
current_time = time(NULL);
for (i=0;i<source->num_yp_directories;i++) {
/* Give the source 5 seconds to update the metadata
before we do our first touch */
/* Don't permit touch intervals of less than 30 seconds */
if (source->ypdata[i]->yp_touch_interval <= 30) {
source->ypdata[i]->yp_touch_interval = 30;
}
source->ypdata[i]->yp_last_touch = 0;
}
}
#endif
/* 6 for max size of port */
listen_url_size = strlen("http://") + strlen(config->hostname) +
......@@ -494,6 +380,23 @@ static void source_init (source_t *source)
source->burst_on_connect = config->burst_on_connect;
config_release_config();
/* maybe better in connection.c */
if ((str = httpp_getvar(source->parser, "ice-public")))
source->yp_public = atoi(str);
if ((str = httpp_getvar(source->parser, "icy-pub")))
source->yp_public = atoi(str);
if (str == NULL)
str = "0";
stats_event (source->mount, "public", str);
str = httpp_getvar(source->parser, "ice-audio-info");
source->audio_info = util_dict_new();
if (str)
{
_parse_audio_info (source, str);
stats_event (source->mount, "audio_info", str);
}
stats_event (source->mount, "listenurl", listenurl);
if (listenurl) {
......@@ -546,6 +449,8 @@ static void source_init (source_t *source)
avl_tree_unlock(global.source_tree);
}
if (source->yp_public)
yp_add (source);
}
......@@ -858,12 +763,6 @@ done:
source->running = 0;
INFO1("Source \"%s\" exiting", source->mount);
#ifdef USE_YP
if(source->yp_public) {
yp_remove(source);
}
#endif
/* we have de-activated the source now, so no more clients will be
* added, now move the listeners we have to the fallback (if any)
*/
......@@ -931,35 +830,36 @@ static int _free_client(void *key)
return 1;
}
static int _parse_audio_info(source_t *source, char *s)
static void _parse_audio_info (source_t *source, const char *s)
{
char *token = NULL;
char *pvar = NULL;
char *variable = NULL;
char *value = NULL;
while ((token = strtok(s,";")) != NULL) {
pvar = strchr(token, '=');
if (pvar) {
variable = (char *)malloc(pvar-token+1);
strncpy(variable, token, pvar-token);
variable[pvar-token] = 0;
pvar++;
if (strlen(pvar)) {
value = util_url_unescape(pvar);
util_dict_set(source->audio_info, variable, value);
stats_event(source->mount, variable, value);
if (value) {
free(value);
}
}
if (variable) {
free(variable);
const char *start = s;
unsigned len;
while (start != NULL && *start != '\0')
{
if ((s = strchr (start, ';')) == NULL)
len = strlen (start);
else
{
len = (int)(s - start);
s++; /* skip passed the ';' */
}
if (len)
{
char name[100], value[100];
char *esc;
sscanf (start, "%199[^=]=%199[^;\r\n]", name, value);
esc = util_url_unescape (value);
if (esc)
{
util_dict_set (source->audio_info, name, esc);
stats_event (source->mount, name, value);
free (esc);
}
}
s = NULL;
start = s;
}
return 1;
}
......
......@@ -43,7 +43,6 @@ typedef struct source_tag
avl_tree *pending_tree;
rwlock_t *shutdown_rwlock;
ypdata_t *ypdata[MAX_YP_DIRECTORIES];
util_dict *audio_info;
char *dumpfilename; /* Name of a file to dump incoming stream to */
......
This diff is collapsed.
......@@ -28,39 +28,25 @@
struct source_tag;
#define YP_ADD_ALL -1
typedef struct ypdata_tag
{
char *sid;
char *server_name;
char *server_desc;
char *server_genre;
char *cluster_password;
char *server_url;
char *listen_url;
char *bitrate;
char *audio_info;
char *server_type;
char *current_song;
char *yp_url;
int yp_url_timeout;
long yp_last_touch;
int yp_touch_interval;
} ypdata_t;
void *yp_touch_thread(void *arg);
int yp_add(struct source_tag *source, int which);
int yp_touch();
int yp_remove(struct source_tag *psource);
ypdata_t *yp_create_ypdata();
void yp_destroy_ypdata(ypdata_t *ypdata);
void add_yp_info(struct source_tag *source, char *stat_name, void *info,
int type);
#ifdef USE_YP
void yp_add (struct source_tag *source);
void yp_remove (const char *mount);
void yp_touch (const char *mount);
void yp_recheck_config (ice_config_t *config);
#else
#define yp_recheck_config(x) do{}while(0)
#endif
void yp_initialize();
void yp_shutdown();
#else
#define yp_add(x) do{}while(0)
#define yp_remove(x) do{}while(0)
#define yp_touch(x) do{}while(0)
#define yp_recheck_config(x) do{}while(0)
#define yp_initialize() do{}while(0)
#define yp_shutdown() do{}while(0)
#endif /* USE_YP */
#endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment