yp.c 27.4 KB
Newer Older
1 2 3 4 5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 8 9 10 11 12
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

18 19
#include <stdio.h>
#include <string.h>
20
#include <stdlib.h>
21
#include <curl/curl.h>
22

Marvin Scholz's avatar
Marvin Scholz committed
23
#include "common/thread/thread.h"
24 25 26 27 28 29 30

#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "logging.h"
#include "format.h"
#include "source.h"
31
#include "cfgfile.h"
32
#include "stats.h"
33 34 35 36

#ifdef WIN32
#define snprintf _snprintf
#endif
37

38
#define CATMODULE "yp"
39

40 41 42
struct yp_server
{
    char        *url;
43
    char        *server_id;
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
    unsigned    url_timeout;
    unsigned    touch_interval;
    int         remove;

    CURL *curl;
    struct ypdata_tag *mounts, *pending_mounts;
    struct yp_server *next;
    char curl_error[CURL_ERROR_SIZE];
};



typedef struct ypdata_tag
{
    int remove;
59
    int release;
60 61 62 63 64 65 66 67 68 69 70 71 72 73
    int cmd_ok;

    char *sid;
    char *mount;
    char *url;
    char *listen_url;
    char *server_name;
    char *server_desc;
    char *server_genre;
    char *cluster_password;
    char *bitrate;
    char *audio_info;
    char *server_type;
    char *current_song;
74
    char *subtype;
75 76

    struct yp_server *server;
77 78 79 80
    time_t next_update;
    unsigned touch_interval;
    char *error_msg;
    int (*process)(struct ypdata_tag *yp, char *s, unsigned len);
81 82 83 84 85 86 87 88

    struct ypdata_tag *next;
} ypdata_t;


static rwlock_t yp_lock;
static mutex_t yp_pending_lock;

Karl Heyes's avatar
Karl Heyes committed
89
static volatile struct yp_server *active_yps = NULL, *pending_yps = NULL;
90
static volatile int yp_update = 0;
91 92 93
static int yp_running;
static time_t now;
static thread_type *yp_thread;
Karl Heyes's avatar
Karl Heyes committed
94
static volatile unsigned client_limit = 0;
95
static volatile char *server_version = NULL;
96 97

static void *yp_update_thread(void *arg);
98 99 100 101
static void add_yp_info(ypdata_t *yp, void *info, int type);
static int do_yp_remove(ypdata_t *yp, char *s, unsigned len);
static int do_yp_add(ypdata_t *yp, char *s, unsigned len);
static int do_yp_touch(ypdata_t *yp, char *s, unsigned len);
102 103
static void add_pending_yp (struct yp_server *server);
static void delete_marked_yp(struct yp_server *server);
104 105 106 107
static void yp_destroy_ypdata(ypdata_t *ypdata);


/* curl callback used to parse headers coming back from the YP server */
108
static size_t handle_returned_header (void *ptr, size_t size, size_t nmemb, void *stream)
109 110
{
    ypdata_t *yp = stream;
111
    size_t bytes = size * nmemb;
112

113
    /* ICECAST_LOG_DEBUG("header from YP is \"%.*s\"", bytes, ptr); */
114
    if (strncasecmp (ptr, "YPResponse: 1", 13) == 0)
115 116
        yp->cmd_ok = 1;

117
    if (strncasecmp (ptr, "YPMessage: ", 11) == 0)
118
    {
119
        size_t len = bytes - 11;
120 121 122
        free (yp->error_msg);
        yp->error_msg = calloc (1, len);
        if (yp->error_msg)
123
            sscanf (ptr + 11, "%[^\r\n]", yp->error_msg);
124 125 126 127
    }

    if (yp->process == do_yp_add)
    {
128
        if (strncasecmp (ptr, "SID: ", 5) == 0)
129
        {
130
            size_t len = bytes - 5;
131 132 133
            free (yp->sid);
            yp->sid = calloc (1, len);
            if (yp->sid)
134
                sscanf (ptr + 5, "%[^\r\n]", yp->sid);
135
        }
136
    }
137
    if (strncasecmp (ptr, "TouchFreq: ", 11) == 0)
138 139
    {
        unsigned secs;
140 141
        if ( sscanf (ptr + 11, "%u", &secs) != 1 )
            secs = 0;
142 143
        if (secs < 30)
            secs = 30;
144
        ICECAST_LOG_DEBUG("server touch interval is %u", secs);
145
        yp->touch_interval = secs;
146
    }
147
    return (size_t)bytes;
148 149 150 151
}


/* capture returned data, but don't do anything with it, shouldn't be any */
152
static size_t handle_returned_data (void *ptr, size_t size, size_t nmemb, void *stream)
153
{
154
    return (size_t)(size*nmemb);
155 156 157 158 159 160 161 162
}


/* search the active and pending YP server lists */
static struct yp_server *find_yp_server (const char *url)
{
    struct yp_server *server;

163
    server = (struct yp_server *)active_yps;
164 165 166 167 168 169
    while (server)
    {
        if (strcmp (server->url, url) == 0)
            return server;
        server = server->next;
    }
170
    server = (struct yp_server *)pending_yps;
171 172 173 174 175 176 177 178 179 180 181 182
    while (server)
    {
        if (strcmp (server->url, url) == 0)
            break;
        server = server->next;
    }
    return server;
}


static void destroy_yp_server (struct yp_server *server)
{
183 184
    ypdata_t *yp;

185 186
    if (server == NULL)
        return;
187
    ICECAST_LOG_DEBUG("Removing YP server entry for %s", server->url);
188 189 190 191 192 193 194 195 196 197 198 199 200 201

    /* delete yps:
     * first move all pendings into main queue.
     * then mark all main queue entries for deleting.
     * then remove all marked entries.
     */
    add_pending_yp(server);
    yp = server->mounts;
    while (yp) {
        yp->remove = 1;
        yp = yp->next;
    }
    delete_marked_yp(server);

202 203
    if (server->curl)
        curl_easy_cleanup (server->curl);
204 205
    if (server->mounts) ICECAST_LOG_WARN("active ypdata not freed");
    if (server->pending_mounts) ICECAST_LOG_WARN("pending ypdata not freed");
206
    free (server->url);
207
    free (server->server_id);
208 209 210 211 212 213
    free (server);
}



/* search for a ypdata entry corresponding to a specific mountpoint */
214
static ypdata_t *find_yp_mount (ypdata_t *mounts, const char *mount)
215
{
216
    ypdata_t *yp = mounts;
217 218 219 220 221 222 223 224 225
    while (yp)
    {
        if (strcmp (yp->mount, mount) == 0)
            break;
        yp = yp->next;
    }
    return yp;
}

226 227 228

void yp_recheck_config (ice_config_t *config)
{
229 230 231
    int i;
    struct yp_server *server;

232
    ICECAST_LOG_DEBUG("Updating YP configuration");
233 234
    thread_rwlock_rlock (&yp_lock);

235
    server = (struct yp_server *)active_yps;
236 237 238 239 240
    while (server)
    {
        server->remove = 1;
        server = server->next;
    }
Karl Heyes's avatar
Karl Heyes committed
241
    client_limit = config->client_limit;
242 243
    free ((char*)server_version);
    server_version = strdup (config->server_id);
244
    /* for each yp url in config, check to see if one exists
245 246 247 248 249 250 251 252 253 254 255 256 257
       if not, then add it. */
    for (i=0 ; i < config->num_yp_directories; i++)
    {
        server = find_yp_server (config->yp_url[i]);
        if (server == NULL)
        {
            server = calloc (1, sizeof (struct yp_server));

            if (server == NULL)
            {
                destroy_yp_server (server);
                break;
            }
258
            server->server_id = strdup ((char *)server_version);
259 260
            server->url = strdup (config->yp_url[i]);
            server->url_timeout = config->yp_url_timeout[i];
261
            server->touch_interval = config->yp_touch_interval[i];
262 263 264 265 266 267
            server->curl = curl_easy_init();
            if (server->curl == NULL)
            {
                destroy_yp_server (server);
                break;
            }
268 269
            if (server->url_timeout > 10 || server->url_timeout < 1)
                server->url_timeout = 6;
270 271
            if (server->touch_interval < 30)
                server->touch_interval = 30;
272
            curl_easy_setopt (server->curl, CURLOPT_USERAGENT, server->server_id);
273 274 275 276 277 278
            curl_easy_setopt (server->curl, CURLOPT_URL, server->url);
            curl_easy_setopt (server->curl, CURLOPT_HEADERFUNCTION, handle_returned_header);
            curl_easy_setopt (server->curl, CURLOPT_WRITEFUNCTION, handle_returned_data);
            curl_easy_setopt (server->curl, CURLOPT_WRITEDATA, server->curl);
            curl_easy_setopt (server->curl, CURLOPT_TIMEOUT, server->url_timeout);
            curl_easy_setopt (server->curl, CURLOPT_NOSIGNAL, 1L);
279 280
            curl_easy_setopt (server->curl, CURLOPT_FOLLOWLOCATION, 1L);
            curl_easy_setopt (server->curl, CURLOPT_MAXREDIRS, 3L);
281
            curl_easy_setopt (server->curl, CURLOPT_ERRORBUFFER, &(server->curl_error[0]));
282
            server->next = (struct yp_server *)pending_yps;
283
            pending_yps = server;
284
            ICECAST_LOG_INFO("Adding new YP server \"%s\" (timeout %ds, default interval %ds)",
285 286 287 288 289 290 291 292 293
                    server->url, server->url_timeout, server->touch_interval);
        }
        else
        {
            server->remove = 0;
        }
    }
    thread_rwlock_unlock (&yp_lock);
    yp_update = 1;
294 295
}

296

297
void yp_initialize(void)
298
{
299
    ice_config_t *config = config_get_config();
300 301
    thread_rwlock_create (&yp_lock);
    thread_mutex_create (&yp_pending_lock);
302 303
    yp_recheck_config (config);
    config_release_config ();
304 305
    yp_thread = thread_create("YP Touch Thread", yp_update_thread,
                            (void *)NULL, THREAD_ATTACHED);
306
}
307 308 309



310
/* handler for curl, checks if successful handling occurred
311 312
 * return 0 for ok, -1 for this entry failed, -2 for server fail.
 * On failure case, update and process are modified
313
 */
314
static int send_to_yp (const char *cmd, ypdata_t *yp, char *post)
315
{
316 317 318
    int curlcode;
    struct yp_server *server = yp->server;

319
    /* ICECAST_LOG_DEBUG("send YP (%s):%s", cmd, post); */
320 321 322 323 324 325 326
    yp->cmd_ok = 0;
    curl_easy_setopt (server->curl, CURLOPT_POSTFIELDS, post);
    curl_easy_setopt (server->curl, CURLOPT_WRITEHEADER, yp);
    curlcode = curl_easy_perform (server->curl);
    if (curlcode)
    {
        yp->process = do_yp_add;
327
        yp->next_update = now + 1200;
328
        ICECAST_LOG_ERROR("connection to %s failed with \"%s\"", server->url, server->curl_error);
329
        return -2;
330
    }
331 332
    if (yp->cmd_ok == 0)
    {
333 334
        if (yp->error_msg == NULL)
            yp->error_msg = strdup ("no response from server");
335 336
        if (yp->process == do_yp_add)
        {
337
            ICECAST_LOG_ERROR("YP %s on %s failed: %s", cmd, server->url, yp->error_msg);
338
            yp->next_update = now + 7200;
339
        }
340
        if (yp->process == do_yp_touch)
341
        {
342 343 344 345 346 347 348
            /* At this point the touch request failed, either because they rejected our session
             * or the server isn't accessible. This means we have to wait before doing another
             * add request. We have a minimum delay but we could allow the directory server to
             * give us a wait time using the TouchFreq header. This time could be given in such
             * cases as a firewall block or incorrect listenurl.
             */
            if (yp->touch_interval < 1200)
349
                yp->next_update = now + 1200;
350
            else
351
                yp->next_update = now + yp->touch_interval;
352
            ICECAST_LOG_INFO("YP %s on %s failed: %s", cmd, server->url, yp->error_msg);
353
        }
354
        yp->process = do_yp_add;
355
        free(yp->sid);
356
        yp->sid = NULL;
357
        return -1;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
358
    }
359
    ICECAST_LOG_DEBUG("YP %s at %s succeeded", cmd, server->url);
360 361 362 363 364
    return 0;
}


/* routines for building and issues requests to the YP server */
365
static int do_yp_remove (ypdata_t *yp, char *s, unsigned len)
366
{
367 368
    int ret = 0;

369 370
    if (yp->sid)
    {
371
        ret = snprintf (s, len, "action=remove&sid=%s", yp->sid);
372 373 374
        if (ret >= (signed)len)
            return ret+1;

375
        ICECAST_LOG_INFO("clearing up YP entry for %s", yp->mount);
376
        ret = send_to_yp ("remove", yp, s);
377 378
        free (yp->sid);
        yp->sid = NULL;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
379
    }
380 381
    yp->remove = 1;
    yp->process = do_yp_add;
382
    yp_update = 1;
383

384
    return ret;
385 386
}

387

388
static int do_yp_add (ypdata_t *yp, char *s, unsigned len)
389
{
390 391
    int ret;
    char *value;
392 393 394 395 396 397
    ice_config_t *config;
    char *admin;

    config = config_get_config();
    admin = util_url_escape(config->admin);
    config_release_config();
398

399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
    value = stats_get_value (yp->mount, "server_type");
    add_yp_info (yp, value, YP_SERVER_TYPE);
    free (value);

    value = stats_get_value (yp->mount, "server_name");
    add_yp_info (yp, value, YP_SERVER_NAME);
    free (value);

    value = stats_get_value (yp->mount, "server_url");
    add_yp_info (yp, value, YP_SERVER_URL);
    free (value);

    value = stats_get_value (yp->mount, "genre");
    add_yp_info (yp, value, YP_SERVER_GENRE);
    free (value);

    value = stats_get_value (yp->mount, "bitrate");
416 417
    if (value == NULL)
        value = stats_get_value (yp->mount, "ice-bitrate");
418 419 420 421 422 423 424
    add_yp_info (yp, value, YP_BITRATE);
    free (value);

    value = stats_get_value (yp->mount, "server_description");
    add_yp_info (yp, value, YP_SERVER_DESC);
    free (value);

425
    value = stats_get_value (yp->mount, "subtype");
426 427 428 429 430 431
    add_yp_info (yp, value, YP_SUBTYPE);
    free (value);

    value = stats_get_value (yp->mount, "audio_info");
    add_yp_info (yp, value, YP_AUDIO_INFO);
    free (value);
432

433
    ret = snprintf (s, len, "action=add&admin=%s&sn=%s&genre=%s&cpswd=%s&desc="
434
                    "%s&url=%s&listenurl=%s&type=%s&stype=%s&b=%s&%s\r\n",
435
                    admin,
436 437
                    yp->server_name, yp->server_genre, yp->cluster_password,
                    yp->server_desc, yp->url, yp->listen_url,
438
                    yp->server_type, yp->subtype, yp->bitrate, yp->audio_info);
439 440
    free(admin);

441 442
    if (ret >= (signed)len)
        return ret+1;
443 444
    ret = send_to_yp ("add", yp, s);
    if (ret == 0)
445 446 447 448 449
    {
        yp->process = do_yp_touch;
        /* force first touch in 5 secs */
        yp->next_update = time(NULL) + 5;
    }
450
    return ret;
451 452 453
}


454
static int do_yp_touch (ypdata_t *yp, char *s, unsigned len)
455
{
Karl Heyes's avatar
Karl Heyes committed
456
    unsigned listeners = 0, max_listeners = 1;
457 458 459 460 461 462 463 464
    char *val, *artist, *title;
    int ret;

    artist = (char *)stats_get_value (yp->mount, "artist");
    title = (char *)stats_get_value (yp->mount, "title");
    if (artist || title)
    {
         char *song;
Philipp Schafft's avatar
Philipp Schafft committed
465
         const char *separator = " - ";
466 467 468 469 470 471 472 473 474 475
         if (artist == NULL)
         {
             artist = strdup("");
             separator = "";
         }
         if (title == NULL) title = strdup("");
         song = malloc (strlen (artist) + strlen (title) + strlen (separator) +1);
         if (song)
         {
             sprintf (song, "%s%s%s", artist, separator, title);
476 477
             add_yp_info(yp, song, YP_CURRENT_SONG);
             stats_event (yp->mount, "yp_currently_playing", song);
478 479 480 481 482 483 484 485 486 487 488 489
             free (song);
         }
    }
    free (artist);
    free (title);

    val = (char *)stats_get_value (yp->mount, "listeners");
    if (val)
    {
        listeners = atoi (val);
        free (val);
    }
Karl Heyes's avatar
Karl Heyes committed
490
    val = stats_get_value (yp->mount, "max_listeners");
491
    if (val == NULL || strcmp (val, "unlimited") == 0 || atoi(val) < 0)
Karl Heyes's avatar
Karl Heyes committed
492 493 494
        max_listeners = client_limit;
    else
        max_listeners = atoi (val);
495
    free (val);
Karl Heyes's avatar
Karl Heyes committed
496

497 498 499
    val = stats_get_value (yp->mount, "subtype");
    if (val)
    {
500
        add_yp_info (yp, val, YP_SUBTYPE);
501 502
        free (val);
    }
503 504

    ret = snprintf (s, len, "action=touch&sid=%s&st=%s"
Karl Heyes's avatar
Karl Heyes committed
505
            "&listeners=%u&max_listeners=%u&stype=%s\r\n",
506
            yp->sid, yp->current_song, listeners, max_listeners, yp->subtype);
507 508 509 510

    if (ret >= (signed)len)
        return ret+1; /* space required for above text and nul*/

511 512 513 514 515 516
    if (send_to_yp ("touch", yp, s) == 0)
    {
        yp->next_update = now + yp->touch_interval;
        return 0;
    }
    return -1;
517
}
518

519 520


521
static int process_ypdata(struct yp_server *server, ypdata_t *yp)
522
{
523
    unsigned len = 1024;
524
    char *s = NULL, *tmp;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
525

526
    if (now < yp->next_update)
527
        return 0;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
528

529 530 531
    /* loop just in case the memory area isn't big enough */
    while (1)
    {
532
        int ret;
533
        if ((tmp = realloc(s, len)) == NULL)
534
            return 0;
535
        s = tmp;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
536

537 538 539 540 541 542
        if (yp->release)
        {
            yp->process = do_yp_remove;
            yp->next_update = 0;
        }

543
        ret = yp->process (yp, s, len);
544
        if (ret <= 0)
545 546
        {
           free (s);
547
           return ret;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
548
        }
549
        len = ret;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
550
    }
551
    return 0;
552
}
553 554 555


static void yp_process_server (struct yp_server *server)
556
{
557
    ypdata_t *yp;
558
    int state = 0;
559

560
    /* ICECAST_LOG_DEBUG("processing yp server %s", server->url); */
561 562 563 564
    yp = server->mounts;
    while (yp)
    {
        now = time (NULL);
565 566 567 568 569
        /* if one of the streams shows that the server cannot be contacted then mark the
         * other entries for an update later. Assume YP server is dead and skip it for now
         */
        if (state == -2)
        {
570
            ICECAST_LOG_DEBUG("skiping %s on %s", yp->mount, server->url);
571 572 573 574 575
            yp->process = do_yp_add;
            yp->next_update += 900;
        }
        else
            state = process_ypdata (server, yp);
576 577 578 579 580 581
        yp = yp->next;
    }
}



582
static ypdata_t *create_yp_entry (const char *mount)
583 584
{
    ypdata_t *yp;
585
    char *s;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
586

587 588 589 590 591 592
    yp = calloc (1, sizeof (ypdata_t));
    do
    {
        unsigned len = 512;
        int ret;
        char *url;
593
        mount_proxy *mountproxy = NULL;
594 595 596 597
        ice_config_t *config;

        if (yp == NULL)
            break;
598
        yp->mount = strdup (mount);
599 600 601 602 603 604 605 606 607
        yp->server_name = strdup ("");
        yp->server_desc = strdup ("");
        yp->server_genre = strdup ("");
        yp->bitrate = strdup ("");
        yp->server_type = strdup ("");
        yp->cluster_password = strdup ("");
        yp->url = strdup ("");
        yp->current_song = strdup ("");
        yp->audio_info = strdup ("");
608
        yp->subtype = strdup ("");
609 610 611 612 613 614
        yp->process = do_yp_add;

        url = malloc (len);
        if (url == NULL)
            break;
        config = config_get_config();
615
        ret = snprintf (url, len, "http://%s:%d%s", config->hostname, config->port, mount);
616
        if (ret >= (signed)len)
617
        {
618 619
            s = realloc (url, ++ret);
            if (s) url = s;
620
            snprintf (url, ret, "http://%s:%d%s", config->hostname, config->port, mount);
621
        }
622

623
        mountproxy = config_find_mount (config, mount, MOUNT_TYPE_NORMAL);
624
        if (mountproxy && mountproxy->cluster_password)
625
            add_yp_info (yp, mountproxy->cluster_password, YP_CLUSTER_PASSWORD);
626
        config_release_config();
627

628 629 630 631 632 633 634 635 636 637 638 639 640 641
        yp->listen_url = util_url_escape (url);
        free (url);
        if (yp->listen_url == NULL)
            break;

        return yp;
    } while (0);

    yp_destroy_ypdata (yp);
    return NULL;
}


/* Check for changes in the YP servers configured */
642
static void check_servers (void)
643
{
644 645
    struct yp_server *server = (struct yp_server *)active_yps,
                     **server_p = (struct yp_server **)&active_yps;
646 647 648 649 650 651

    while (server)
    {
        if (server->remove)
        {
            struct yp_server *to_go = server;
652
            ICECAST_LOG_DEBUG("YP server \"%s\"removed", server->url);
653 654
            *server_p = server->next;
            server = server->next;
655
            destroy_yp_server(to_go);
656
            continue;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
657
        }
658 659 660 661 662 663 664 665
        server_p = &server->next;
        server = server->next;
    }
    /* add new server entries */
    while (pending_yps)
    {
        avl_node *node;

666
        server = (struct yp_server *)pending_yps;
667 668
        pending_yps = server->next;

669
        ICECAST_LOG_DEBUG("Add pending yps %s", server->url);
670
        server->next = (struct yp_server *)active_yps;
671 672 673 674 675 676 677 678 679 680
        active_yps = server;

        /* new YP server configured, need to populate with existing sources */
        avl_tree_rlock (global.source_tree);
        node = avl_get_first (global.source_tree);
        while (node)
        {
            ypdata_t *yp;

            source_t *source = node->key;
681
            if (source->yp_public && (yp = create_yp_entry (source->mount)) != NULL)
682
            {
683
                ICECAST_LOG_DEBUG("Adding existing mount %s", source->mount);
684 685 686 687
                yp->server = server;
                yp->touch_interval = server->touch_interval;
                yp->next = server->mounts;
                server->mounts = yp;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
688
            }
689
            node = avl_get_next (node);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
690
        }
691
        avl_tree_unlock (global.source_tree);
692
    }
693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712
}


static void add_pending_yp (struct yp_server *server)
{
    ypdata_t *current, *yp;
    unsigned count = 0;

    if (server->pending_mounts == NULL)
        return;
    current = server->mounts;
    server->mounts = server->pending_mounts;
    server->pending_mounts = NULL;
    yp = server->mounts;
    while (1)
    {
        count++;
        if (yp->next == NULL)
            break;
        yp = yp->next;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
713
    }
714
    yp->next = current;
715
    ICECAST_LOG_DEBUG("%u YP entries added to %s", count, server->url);
716 717
}

718

719
static void delete_marked_yp(struct yp_server *server)
720 721
{
    ypdata_t *yp = server->mounts, **prev = &server->mounts;
722

723 724 725 726 727
    while (yp)
    {
        if (yp->remove)
        {
            ypdata_t *to_go = yp;
728
            ICECAST_LOG_DEBUG("removed %s from YP server %s", yp->mount, server->url);
729 730
            *prev = yp->next;
            yp = yp->next;
731
            yp_destroy_ypdata(to_go);
732 733 734 735 736
            continue;
        }
        prev = &yp->next;
        yp = yp->next;
    }
737
}
738 739 740


static void *yp_update_thread(void *arg)
741
{
742
    ICECAST_LOG_INFO("YP update thread started");
743 744 745 746 747 748 749 750 751 752

    yp_running = 1;
    while (yp_running)
    {
        struct yp_server *server;

        thread_sleep (200000);

        /* do the YP communication */
        thread_rwlock_rlock (&yp_lock);
753
        server = (struct yp_server *)active_yps;
754 755
        while (server)
        {
756
            /* ICECAST_LOG_DEBUG("trying %s", server->url); */
757 758
            yp_process_server (server);
            server = server->next;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
759
        }
760 761 762 763 764 765 766
        thread_rwlock_unlock (&yp_lock);

        /* update the local YP structure */
        if (yp_update)
        {
            thread_rwlock_wlock (&yp_lock);
            check_servers ();
767
            server = (struct yp_server *)active_yps;
768 769
            while (server)
            {
770
                /* ICECAST_LOG_DEBUG("Checking yps %s", server->url); */
771 772 773
                add_pending_yp (server);
                delete_marked_yp (server);
                server = server->next;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
774
            }
775 776
            yp_update = 0;
            thread_rwlock_unlock (&yp_lock);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
777 778
        }
    }
779 780 781 782 783
    thread_rwlock_destroy (&yp_lock);
    thread_mutex_destroy (&yp_pending_lock);
    /* free server and ypdata left */
    while (active_yps)
    {
784
        struct yp_server *server = (struct yp_server *)active_yps;
785 786 787
        active_yps = server->next;
        destroy_yp_server (server);
    }
788

789
    return NULL;
790
}
791

792 793 794


static void yp_destroy_ypdata(ypdata_t *ypdata)
795
{
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
796
    if (ypdata) {
797
        if (ypdata->mount) {
798
            free(ypdata->mount);
799 800
        }
        if (ypdata->url) {
801
            free(ypdata->url);
802
        }
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829
        if (ypdata->sid) {
            free(ypdata->sid);
        }
        if (ypdata->server_name) {
            free(ypdata->server_name);
        }
        if (ypdata->server_desc) {
            free(ypdata->server_desc);
        }
        if (ypdata->server_genre) {
            free(ypdata->server_genre);
        }
        if (ypdata->cluster_password) {
            free(ypdata->cluster_password);
        }
        if (ypdata->listen_url) {
            free(ypdata->listen_url);
        }
        if (ypdata->current_song) {
            free(ypdata->current_song);
        }
        if (ypdata->bitrate) {
            free(ypdata->bitrate);
        }
        if (ypdata->server_type) {
            free(ypdata->server_type);
        }
830 831 832
        if (ypdata->audio_info) {
            free(ypdata->audio_info);
        }
833 834 835
        free(ypdata->subtype);
        free(ypdata->error_msg);
        free(ypdata);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
836
    }
837
}
838

839
static void add_yp_info(ypdata_t *yp, void *info, int type)
840 841
{
    char *escaped;
842 843

    if (!info)
844
        return;
845

846 847 848 849
    escaped = util_url_escape(info);
    if (escaped == NULL)
        return;

850 851
    switch (type)
    {
852
        case YP_SERVER_NAME:
853 854
            free (yp->server_name);
            yp->server_name = escaped;
855
            break;
856
        case YP_SERVER_DESC:
857 858
            free (yp->server_desc);
            yp->server_desc = escaped;
859
            break;
860
        case YP_SERVER_GENRE:
861 862
            free (yp->server_genre);
            yp->server_genre = escaped;
863
            break;
864
        case YP_SERVER_URL:
865 866
            free (yp->url);
            yp->url = escaped;
867
            break;
868
        case YP_BITRATE:
869 870
            free (yp->bitrate);
            yp->bitrate = escaped;
871
            break;
872
        case YP_AUDIO_INFO:
873 874
            free (yp->audio_info);
            yp->audio_info = escaped;
875
            break;
876
        case YP_SERVER_TYPE:
877 878
            free (yp->server_type);
            yp->server_type = escaped;
879
            break;
880
        case YP_CURRENT_SONG:
881 882
            free (yp->current_song);
            yp->current_song = escaped;
883
            break;
884
        case YP_CLUSTER_PASSWORD:
885 886
            free (yp->cluster_password);
            yp->cluster_password = escaped;
887
            break;
888
        case YP_SUBTYPE:
889 890
            free (yp->subtype);
            yp->subtype = escaped;
891
            break;
892 893
        default:
            free (escaped);
894 895 896 897 898
    }
}


/* Add YP entries to active servers */
899
void yp_add (const char *mount)
900 901 902 903 904 905 906 907
{
    struct yp_server *server;

    /* make sure YP thread is not modifying the lists */
    thread_rwlock_rlock (&yp_lock);

    /* make sure we don't race against another yp_add */
    thread_mutex_lock (&yp_pending_lock);
908
    server = (struct yp_server *)active_yps;
909 910 911
    while (server)
    {
        ypdata_t *yp;
912 913 914 915

        /* on-demand relays may already have a YP entry */
        yp = find_yp_mount (server->mounts, mount);
        if (yp == NULL)
916
        {
917 918 919 920
            /* add new ypdata to each servers pending yp */
            yp = create_yp_entry (mount);
            if (yp)
            {
921
                ICECAST_LOG_DEBUG("Adding %s to %s", mount, server->url);
922 923 924
                yp->server = server;
                yp->touch_interval = server->touch_interval;
                yp->next = server->pending_mounts;
925
                yp->next_update = time(NULL) + 60;
926 927 928
                server->pending_mounts = yp;
                yp_update = 1;
            }
929
        }
930
        else
931
            ICECAST_LOG_DEBUG("YP entry %s already exists", mount);
932 933 934 935 936 937 938 939 940 941 942
        server = server->next;
    }
    thread_mutex_unlock (&yp_pending_lock);
    thread_rwlock_unlock (&yp_lock);
}



/* Mark an existing entry in the YP list as to be marked for deletion */
void yp_remove (const char *mount)
{
943
    struct yp_server *server = (struct yp_server *)active_yps;
944 945 946 947

    thread_rwlock_rlock (&yp_lock);
    while (server)
    {
948 949 950
        ypdata_t *list = server->mounts;

        while (1)
951
        {
952 953 954 955 956 957 958 959
            ypdata_t *yp = find_yp_mount (list, mount);
            if (yp == NULL)
                break;
            if (yp->release || yp->remove)
            {
                list = yp->next;
                continue;   /* search again these are old entries */
            }
960
            ICECAST_LOG_DEBUG("release %s on YP %s", mount, server->url);
961
            yp->release = 1;
962
            yp->next_update = 0;
963
        }
964
        server = server->next;
965
    }
966
    thread_rwlock_unlock (&yp_lock);
967
}
968 969 970 971 972 973


/* This is similar to yp_remove, but we force a touch
 * attempt */
void yp_touch (const char *mount)
{
974
    struct yp_server *server = (struct yp_server *)active_yps;
975
    ypdata_t *search_list = NULL;
976 977

    thread_rwlock_rlock (&yp_lock);
978 979 980
    if (server)
        search_list = server->mounts;

981 982
    while (server)
    {
983
        ypdata_t *yp = find_yp_mount (search_list, mount);
984 985
        if (yp)
        {
986
            /* we may of found old entries not purged yet, so skip them */
987
            if (yp->release != 0 || yp->remove != 0)
988 989 990 991
            {
                search_list = yp->next;
                continue;
            }
992 993 994
            /* don't update the directory if there is a touch scheduled soon */
            if (yp->process == do_yp_touch && now + yp->touch_interval - yp->next_update > 60)
                yp->next_update = now + 3;
995 996
        }
        server = server->next;
997 998
        if (server)
            search_list = server->mounts;
999 1000 1001 1002 1003
    }
    thread_rwlock_unlock (&yp_lock);
}


1004
void yp_shutdown (void)
1005 1006 1007
{
    yp_running = 0;
    yp_update = 1;
1008 1009
    if (yp_thread)
        thread_join (yp_thread);
1010
    curl_global_cleanup();
1011 1012
    free ((char*)server_version);
    server_version = NULL;
1013
    ICECAST_LOG_INFO("YP thread down");
1014 1015
}