yp.c 27.2 KB
Newer Older
1 2 3 4 5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 8 9 10 11 12
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

18 19
#include <stdio.h>
#include <string.h>
20
#include <stdlib.h>
21
#include <curl/curl.h>
22

Marvin Scholz's avatar
Marvin Scholz committed
23
#include "common/thread/thread.h"
24 25 26 27 28 29 30

#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "logging.h"
#include "format.h"
#include "source.h"
31
#include "cfgfile.h"
32
#include "stats.h"
33 34 35 36

#ifdef WIN32
#define snprintf _snprintf
#endif
37

38
#define CATMODULE "yp"
39

40 41 42
struct yp_server
{
    char        *url;
43
    char        *server_id;
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
    unsigned    url_timeout;
    unsigned    touch_interval;
    int         remove;

    CURL *curl;
    struct ypdata_tag *mounts, *pending_mounts;
    struct yp_server *next;
    char curl_error[CURL_ERROR_SIZE];
};



typedef struct ypdata_tag
{
    int remove;
59
    int release;
60 61 62 63 64 65 66 67 68 69 70 71 72 73
    int cmd_ok;

    char *sid;
    char *mount;
    char *url;
    char *listen_url;
    char *server_name;
    char *server_desc;
    char *server_genre;
    char *cluster_password;
    char *bitrate;
    char *audio_info;
    char *server_type;
    char *current_song;
74
    char *subtype;
75 76

    struct yp_server *server;
77 78 79 80
    time_t next_update;
    unsigned touch_interval;
    char *error_msg;
    int (*process)(struct ypdata_tag *yp, char *s, unsigned len);
81 82 83 84 85 86 87 88

    struct ypdata_tag *next;
} ypdata_t;


static rwlock_t yp_lock;
static mutex_t yp_pending_lock;

Karl Heyes's avatar
Karl Heyes committed
89
static volatile struct yp_server *active_yps = NULL, *pending_yps = NULL;
90
static volatile int yp_update = 0;
91 92 93
static int yp_running;
static time_t now;
static thread_type *yp_thread;
Karl Heyes's avatar
Karl Heyes committed
94
static volatile unsigned client_limit = 0;
95
static volatile char *server_version = NULL;
96 97

static void *yp_update_thread(void *arg);
98 99 100 101
static void add_yp_info(ypdata_t *yp, void *info, int type);
static int do_yp_remove(ypdata_t *yp, char *s, unsigned len);
static int do_yp_add(ypdata_t *yp, char *s, unsigned len);
static int do_yp_touch(ypdata_t *yp, char *s, unsigned len);
102 103
static void add_pending_yp (struct yp_server *server);
static void delete_marked_yp(struct yp_server *server);
104 105 106 107
static void yp_destroy_ypdata(ypdata_t *ypdata);


/* curl callback used to parse headers coming back from the YP server */
108
static size_t handle_returned_header (void *ptr, size_t size, size_t nmemb, void *stream)
109 110
{
    ypdata_t *yp = stream;
111
    size_t bytes = size * nmemb;
112

113
    /* ICECAST_LOG_DEBUG("header from YP is \"%.*s\"", bytes, ptr); */
114
    if (strncasecmp (ptr, "YPResponse: 1", 13) == 0)
115 116
        yp->cmd_ok = 1;

117
    if (strncasecmp (ptr, "YPMessage: ", 11) == 0)
118
    {
119
        size_t len = bytes - 11;
120 121 122
        free (yp->error_msg);
        yp->error_msg = calloc (1, len);
        if (yp->error_msg)
123
            sscanf (ptr + 11, "%[^\r\n]", yp->error_msg);
124 125 126 127
    }

    if (yp->process == do_yp_add)
    {
128
        if (strncasecmp (ptr, "SID: ", 5) == 0)
129
        {
130
            size_t len = bytes - 5;
131 132 133
            free (yp->sid);
            yp->sid = calloc (1, len);
            if (yp->sid)
134
                sscanf (ptr + 5, "%[^\r\n]", yp->sid);
135
        }
136
    }
137
    if (strncasecmp (ptr, "TouchFreq: ", 11) == 0)
138 139
    {
        unsigned secs;
140 141
        if ( sscanf (ptr + 11, "%u", &secs) != 1 )
            secs = 0;
142 143
        if (secs < 30)
            secs = 30;
144
        ICECAST_LOG_DEBUG("server touch interval is %u", secs);
145
        yp->touch_interval = secs;
146
    }
147
    return (size_t)bytes;
148 149 150 151
}


/* capture returned data, but don't do anything with it, shouldn't be any */
152
static size_t handle_returned_data (void *ptr, size_t size, size_t nmemb, void *stream)
153
{
154
    return (size_t)(size*nmemb);
155 156 157 158 159 160 161 162
}


/* search the active and pending YP server lists */
static struct yp_server *find_yp_server (const char *url)
{
    struct yp_server *server;

163
    server = (struct yp_server *)active_yps;
164 165 166 167 168 169
    while (server)
    {
        if (strcmp (server->url, url) == 0)
            return server;
        server = server->next;
    }
170
    server = (struct yp_server *)pending_yps;
171 172 173 174 175 176 177 178 179 180 181 182
    while (server)
    {
        if (strcmp (server->url, url) == 0)
            break;
        server = server->next;
    }
    return server;
}


static void destroy_yp_server (struct yp_server *server)
{
183 184
    ypdata_t *yp;

185 186
    if (server == NULL)
        return;
187
    ICECAST_LOG_DEBUG("Removing YP server entry for %s", server->url);
188 189 190 191 192 193 194 195 196 197 198 199 200 201

    /* delete yps:
     * first move all pendings into main queue.
     * then mark all main queue entries for deleting.
     * then remove all marked entries.
     */
    add_pending_yp(server);
    yp = server->mounts;
    while (yp) {
        yp->remove = 1;
        yp = yp->next;
    }
    delete_marked_yp(server);

202 203
    if (server->curl)
        curl_easy_cleanup (server->curl);
204 205
    if (server->mounts) ICECAST_LOG_WARN("active ypdata not freed up");
    if (server->pending_mounts) ICECAST_LOG_WARN("pending ypdata not freed up");
206
    free (server->url);
207
    free (server->server_id);
208 209 210 211 212 213
    free (server);
}



/* search for a ypdata entry corresponding to a specific mountpoint */
214
static ypdata_t *find_yp_mount (ypdata_t *mounts, const char *mount)
215
{
216
    ypdata_t *yp = mounts;
217 218 219 220 221 222 223 224 225
    while (yp)
    {
        if (strcmp (yp->mount, mount) == 0)
            break;
        yp = yp->next;
    }
    return yp;
}

226 227 228

void yp_recheck_config (ice_config_t *config)
{
229 230 231
    int i;
    struct yp_server *server;

232
    ICECAST_LOG_DEBUG("Updating YP configuration");
233 234
    thread_rwlock_rlock (&yp_lock);

235
    server = (struct yp_server *)active_yps;
236 237 238 239 240
    while (server)
    {
        server->remove = 1;
        server = server->next;
    }
Karl Heyes's avatar
Karl Heyes committed
241
    client_limit = config->client_limit;
242 243
    free ((char*)server_version);
    server_version = strdup (config->server_id);
244
    /* for each yp url in config, check to see if one exists
245 246 247 248 249 250 251 252 253 254 255 256 257
       if not, then add it. */
    for (i=0 ; i < config->num_yp_directories; i++)
    {
        server = find_yp_server (config->yp_url[i]);
        if (server == NULL)
        {
            server = calloc (1, sizeof (struct yp_server));

            if (server == NULL)
            {
                destroy_yp_server (server);
                break;
            }
258
            server->server_id = strdup ((char *)server_version);
259 260
            server->url = strdup (config->yp_url[i]);
            server->url_timeout = config->yp_url_timeout[i];
261
            server->touch_interval = config->yp_touch_interval[i];
262 263 264 265 266 267
            server->curl = curl_easy_init();
            if (server->curl == NULL)
            {
                destroy_yp_server (server);
                break;
            }
268 269
            if (server->url_timeout > 10 || server->url_timeout < 1)
                server->url_timeout = 6;
270 271
            if (server->touch_interval < 30)
                server->touch_interval = 30;
272
            curl_easy_setopt (server->curl, CURLOPT_USERAGENT, server->server_id);
273 274 275 276 277 278
            curl_easy_setopt (server->curl, CURLOPT_URL, server->url);
            curl_easy_setopt (server->curl, CURLOPT_HEADERFUNCTION, handle_returned_header);
            curl_easy_setopt (server->curl, CURLOPT_WRITEFUNCTION, handle_returned_data);
            curl_easy_setopt (server->curl, CURLOPT_WRITEDATA, server->curl);
            curl_easy_setopt (server->curl, CURLOPT_TIMEOUT, server->url_timeout);
            curl_easy_setopt (server->curl, CURLOPT_NOSIGNAL, 1L);
279 280
            curl_easy_setopt (server->curl, CURLOPT_FOLLOWLOCATION, 1L);
            curl_easy_setopt (server->curl, CURLOPT_MAXREDIRS, 3L);
281
            curl_easy_setopt (server->curl, CURLOPT_ERRORBUFFER, &(server->curl_error[0]));
282
            server->next = (struct yp_server *)pending_yps;
283
            pending_yps = server;
284
            ICECAST_LOG_INFO("Adding new YP server \"%s\" (timeout %ds, default interval %ds)",
285 286 287 288 289 290 291 292 293
                    server->url, server->url_timeout, server->touch_interval);
        }
        else
        {
            server->remove = 0;
        }
    }
    thread_rwlock_unlock (&yp_lock);
    yp_update = 1;
294 295
}

296

297
void yp_initialize(void)
298
{
299
    ice_config_t *config = config_get_config();
300 301
    thread_rwlock_create (&yp_lock);
    thread_mutex_create (&yp_pending_lock);
302 303
    yp_recheck_config (config);
    config_release_config ();
304 305
    yp_thread = thread_create("YP Touch Thread", yp_update_thread,
                            (void *)NULL, THREAD_ATTACHED);
306
}
307 308 309



310
/* handler for curl, checks if successful handling occurred
311 312
 * return 0 for ok, -1 for this entry failed, -2 for server fail.
 * On failure case, update and process are modified
313
 */
314
static int send_to_yp (const char *cmd, ypdata_t *yp, char *post)
315
{
316 317 318
    int curlcode;
    struct yp_server *server = yp->server;

319
    /* ICECAST_LOG_DEBUG("send YP (%s):%s", cmd, post); */
320 321 322 323 324 325 326
    yp->cmd_ok = 0;
    curl_easy_setopt (server->curl, CURLOPT_POSTFIELDS, post);
    curl_easy_setopt (server->curl, CURLOPT_WRITEHEADER, yp);
    curlcode = curl_easy_perform (server->curl);
    if (curlcode)
    {
        yp->process = do_yp_add;
327
        yp->next_update = now + 1200;
328
        ICECAST_LOG_ERROR("connection to %s failed with \"%s\"", server->url, server->curl_error);
329
        return -2;
330
    }
331 332
    if (yp->cmd_ok == 0)
    {
333 334
        if (yp->error_msg == NULL)
            yp->error_msg = strdup ("no response from server");
335 336
        if (yp->process == do_yp_add)
        {
337
            ICECAST_LOG_ERROR("YP %s on %s failed: %s", cmd, server->url, yp->error_msg);
338
            yp->next_update = now + 7200;
339
        }
340
        if (yp->process == do_yp_touch)
341
        {
342 343 344 345 346 347 348
            /* At this point the touch request failed, either because they rejected our session
             * or the server isn't accessible. This means we have to wait before doing another
             * add request. We have a minimum delay but we could allow the directory server to
             * give us a wait time using the TouchFreq header. This time could be given in such
             * cases as a firewall block or incorrect listenurl.
             */
            if (yp->touch_interval < 1200)
349
                yp->next_update = now + 1200;
350
            else
351
                yp->next_update = now + yp->touch_interval;
352
            ICECAST_LOG_INFO("YP %s on %s failed: %s", cmd, server->url, yp->error_msg);
353
        }
354
        yp->process = do_yp_add;
355
        free(yp->sid);
356
        yp->sid = NULL;
357
        return -1;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
358
    }
359
    ICECAST_LOG_DEBUG("YP %s at %s succeeded", cmd, server->url);
360 361 362 363 364
    return 0;
}


/* routines for building and issues requests to the YP server */
365
static int do_yp_remove (ypdata_t *yp, char *s, unsigned len)
366
{
367 368
    int ret = 0;

369 370
    if (yp->sid)
    {
371
        ret = snprintf (s, len, "action=remove&sid=%s", yp->sid);
372 373 374
        if (ret >= (signed)len)
            return ret+1;

375
        ICECAST_LOG_INFO("clearing up YP entry for %s", yp->mount);
376
        ret = send_to_yp ("remove", yp, s);
377 378
        free (yp->sid);
        yp->sid = NULL;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
379
    }
380 381
    yp->remove = 1;
    yp->process = do_yp_add;
382
    yp_update = 1;
383

384
    return ret;
385 386
}

387

388
static int do_yp_add (ypdata_t *yp, char *s, unsigned len)
389
{
390 391 392
    int ret;
    char *value;

393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
    value = stats_get_value (yp->mount, "server_type");
    add_yp_info (yp, value, YP_SERVER_TYPE);
    free (value);

    value = stats_get_value (yp->mount, "server_name");
    add_yp_info (yp, value, YP_SERVER_NAME);
    free (value);

    value = stats_get_value (yp->mount, "server_url");
    add_yp_info (yp, value, YP_SERVER_URL);
    free (value);

    value = stats_get_value (yp->mount, "genre");
    add_yp_info (yp, value, YP_SERVER_GENRE);
    free (value);

    value = stats_get_value (yp->mount, "bitrate");
410 411
    if (value == NULL)
        value = stats_get_value (yp->mount, "ice-bitrate");
412 413 414 415 416 417 418
    add_yp_info (yp, value, YP_BITRATE);
    free (value);

    value = stats_get_value (yp->mount, "server_description");
    add_yp_info (yp, value, YP_SERVER_DESC);
    free (value);

419
    value = stats_get_value (yp->mount, "subtype");
420 421 422 423 424 425
    add_yp_info (yp, value, YP_SUBTYPE);
    free (value);

    value = stats_get_value (yp->mount, "audio_info");
    add_yp_info (yp, value, YP_AUDIO_INFO);
    free (value);
426 427 428

    ret = snprintf (s, len, "action=add&sn=%s&genre=%s&cpswd=%s&desc="
                    "%s&url=%s&listenurl=%s&type=%s&stype=%s&b=%s&%s\r\n",
429 430
                    yp->server_name, yp->server_genre, yp->cluster_password,
                    yp->server_desc, yp->url, yp->listen_url,
431
                    yp->server_type, yp->subtype, yp->bitrate, yp->audio_info);
432 433
    if (ret >= (signed)len)
        return ret+1;
434 435
    ret = send_to_yp ("add", yp, s);
    if (ret == 0)
436 437 438 439 440
    {
        yp->process = do_yp_touch;
        /* force first touch in 5 secs */
        yp->next_update = time(NULL) + 5;
    }
441
    return ret;
442 443 444
}


445
static int do_yp_touch (ypdata_t *yp, char *s, unsigned len)
446
{
Karl Heyes's avatar
Karl Heyes committed
447
    unsigned listeners = 0, max_listeners = 1;
448 449 450 451 452 453 454 455
    char *val, *artist, *title;
    int ret;

    artist = (char *)stats_get_value (yp->mount, "artist");
    title = (char *)stats_get_value (yp->mount, "title");
    if (artist || title)
    {
         char *song;
Philipp Schafft's avatar
Philipp Schafft committed
456
         const char *separator = " - ";
457 458 459 460 461 462 463 464 465 466
         if (artist == NULL)
         {
             artist = strdup("");
             separator = "";
         }
         if (title == NULL) title = strdup("");
         song = malloc (strlen (artist) + strlen (title) + strlen (separator) +1);
         if (song)
         {
             sprintf (song, "%s%s%s", artist, separator, title);
467 468
             add_yp_info(yp, song, YP_CURRENT_SONG);
             stats_event (yp->mount, "yp_currently_playing", song);
469 470 471 472 473 474 475 476 477 478 479 480
             free (song);
         }
    }
    free (artist);
    free (title);

    val = (char *)stats_get_value (yp->mount, "listeners");
    if (val)
    {
        listeners = atoi (val);
        free (val);
    }
Karl Heyes's avatar
Karl Heyes committed
481
    val = stats_get_value (yp->mount, "max_listeners");
482
    if (val == NULL || strcmp (val, "unlimited") == 0 || atoi(val) < 0)
Karl Heyes's avatar
Karl Heyes committed
483 484 485
        max_listeners = client_limit;
    else
        max_listeners = atoi (val);
486
    free (val);
Karl Heyes's avatar
Karl Heyes committed
487

488 489 490
    val = stats_get_value (yp->mount, "subtype");
    if (val)
    {
491
        add_yp_info (yp, val, YP_SUBTYPE);
492 493
        free (val);
    }
494 495

    ret = snprintf (s, len, "action=touch&sid=%s&st=%s"
Karl Heyes's avatar
Karl Heyes committed
496
            "&listeners=%u&max_listeners=%u&stype=%s\r\n",
497
            yp->sid, yp->current_song, listeners, max_listeners, yp->subtype);
498 499 500 501

    if (ret >= (signed)len)
        return ret+1; /* space required for above text and nul*/

502 503 504 505 506 507
    if (send_to_yp ("touch", yp, s) == 0)
    {
        yp->next_update = now + yp->touch_interval;
        return 0;
    }
    return -1;
508
}
509

510 511


512
static int process_ypdata(struct yp_server *server, ypdata_t *yp)
513
{
514
    unsigned len = 1024;
515
    char *s = NULL, *tmp;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
516

517
    if (now < yp->next_update)
518
        return 0;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
519

520 521 522
    /* loop just in case the memory area isn't big enough */
    while (1)
    {
523
        int ret;
524
        if ((tmp = realloc(s, len)) == NULL)
525
            return 0;
526
        s = tmp;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
527

528 529 530 531 532 533
        if (yp->release)
        {
            yp->process = do_yp_remove;
            yp->next_update = 0;
        }

534
        ret = yp->process (yp, s, len);
535
        if (ret <= 0)
536 537
        {
           free (s);
538
           return ret;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
539
        }
540
        len = ret;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
541
    }
542
    return 0;
543
}
544 545 546


static void yp_process_server (struct yp_server *server)
547
{
548
    ypdata_t *yp;
549
    int state = 0;
550

551
    /* ICECAST_LOG_DEBUG("processing yp server %s", server->url); */
552 553 554 555
    yp = server->mounts;
    while (yp)
    {
        now = time (NULL);
556 557 558 559 560
        /* if one of the streams shows that the server cannot be contacted then mark the
         * other entries for an update later. Assume YP server is dead and skip it for now
         */
        if (state == -2)
        {
561
            ICECAST_LOG_DEBUG("skiping %s on %s", yp->mount, server->url);
562 563 564 565 566
            yp->process = do_yp_add;
            yp->next_update += 900;
        }
        else
            state = process_ypdata (server, yp);
567 568 569 570 571 572
        yp = yp->next;
    }
}



573
static ypdata_t *create_yp_entry (const char *mount)
574 575
{
    ypdata_t *yp;
576
    char *s;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
577

578 579 580 581 582 583
    yp = calloc (1, sizeof (ypdata_t));
    do
    {
        unsigned len = 512;
        int ret;
        char *url;
584
        mount_proxy *mountproxy = NULL;
585 586 587 588
        ice_config_t *config;

        if (yp == NULL)
            break;
589
        yp->mount = strdup (mount);
590 591 592 593 594 595 596 597 598
        yp->server_name = strdup ("");
        yp->server_desc = strdup ("");
        yp->server_genre = strdup ("");
        yp->bitrate = strdup ("");
        yp->server_type = strdup ("");
        yp->cluster_password = strdup ("");
        yp->url = strdup ("");
        yp->current_song = strdup ("");
        yp->audio_info = strdup ("");
599
        yp->subtype = strdup ("");
600 601 602 603 604 605
        yp->process = do_yp_add;

        url = malloc (len);
        if (url == NULL)
            break;
        config = config_get_config();
606
        ret = snprintf (url, len, "http://%s:%d%s", config->hostname, config->port, mount);
607
        if (ret >= (signed)len)
608
        {
609 610
            s = realloc (url, ++ret);
            if (s) url = s;
611
            snprintf (url, ret, "http://%s:%d%s", config->hostname, config->port, mount);
612
        }
613

614
        mountproxy = config_find_mount (config, mount, MOUNT_TYPE_NORMAL);
615
        if (mountproxy && mountproxy->cluster_password)
616
            add_yp_info (yp, mountproxy->cluster_password, YP_CLUSTER_PASSWORD);
617
        config_release_config();
618

619 620 621 622 623 624 625 626 627 628 629 630 631 632
        yp->listen_url = util_url_escape (url);
        free (url);
        if (yp->listen_url == NULL)
            break;

        return yp;
    } while (0);

    yp_destroy_ypdata (yp);
    return NULL;
}


/* Check for changes in the YP servers configured */
633
static void check_servers (void)
634
{
635 636
    struct yp_server *server = (struct yp_server *)active_yps,
                     **server_p = (struct yp_server **)&active_yps;
637 638 639 640 641 642

    while (server)
    {
        if (server->remove)
        {
            struct yp_server *to_go = server;
643
            ICECAST_LOG_DEBUG("YP server \"%s\"removed", server->url);
644 645
            *server_p = server->next;
            server = server->next;
646
            destroy_yp_server(to_go);
647
            continue;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
648
        }
649 650 651 652 653 654 655 656
        server_p = &server->next;
        server = server->next;
    }
    /* add new server entries */
    while (pending_yps)
    {
        avl_node *node;

657
        server = (struct yp_server *)pending_yps;
658 659
        pending_yps = server->next;

660
        ICECAST_LOG_DEBUG("Add pending yps %s", server->url);
661
        server->next = (struct yp_server *)active_yps;
662 663 664 665 666 667 668 669 670 671
        active_yps = server;

        /* new YP server configured, need to populate with existing sources */
        avl_tree_rlock (global.source_tree);
        node = avl_get_first (global.source_tree);
        while (node)
        {
            ypdata_t *yp;

            source_t *source = node->key;
672
            if (source->yp_public && (yp = create_yp_entry (source->mount)) != NULL)
673
            {
674
                ICECAST_LOG_DEBUG("Adding existing mount %s", source->mount);
675 676 677 678
                yp->server = server;
                yp->touch_interval = server->touch_interval;
                yp->next = server->mounts;
                server->mounts = yp;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
679
            }
680
            node = avl_get_next (node);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
681
        }
682
        avl_tree_unlock (global.source_tree);
683
    }
684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703
}


static void add_pending_yp (struct yp_server *server)
{
    ypdata_t *current, *yp;
    unsigned count = 0;

    if (server->pending_mounts == NULL)
        return;
    current = server->mounts;
    server->mounts = server->pending_mounts;
    server->pending_mounts = NULL;
    yp = server->mounts;
    while (1)
    {
        count++;
        if (yp->next == NULL)
            break;
        yp = yp->next;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
704
    }
705
    yp->next = current;
706
    ICECAST_LOG_DEBUG("%u YP entries added to %s", count, server->url);
707 708
}

709

710
static void delete_marked_yp(struct yp_server *server)
711 712
{
    ypdata_t *yp = server->mounts, **prev = &server->mounts;
713

714 715 716 717 718
    while (yp)
    {
        if (yp->remove)
        {
            ypdata_t *to_go = yp;
719
            ICECAST_LOG_DEBUG("removed %s from YP server %s", yp->mount, server->url);
720 721
            *prev = yp->next;
            yp = yp->next;
722
            yp_destroy_ypdata(to_go);
723 724 725 726 727
            continue;
        }
        prev = &yp->next;
        yp = yp->next;
    }
728
}
729 730 731


static void *yp_update_thread(void *arg)
732
{
733
    ICECAST_LOG_INFO("YP update thread started");
734 735 736 737 738 739 740 741 742 743

    yp_running = 1;
    while (yp_running)
    {
        struct yp_server *server;

        thread_sleep (200000);

        /* do the YP communication */
        thread_rwlock_rlock (&yp_lock);
744
        server = (struct yp_server *)active_yps;
745 746
        while (server)
        {
747
            /* ICECAST_LOG_DEBUG("trying %s", server->url); */
748 749
            yp_process_server (server);
            server = server->next;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
750
        }
751 752 753 754 755 756 757
        thread_rwlock_unlock (&yp_lock);

        /* update the local YP structure */
        if (yp_update)
        {
            thread_rwlock_wlock (&yp_lock);
            check_servers ();
758
            server = (struct yp_server *)active_yps;
759 760
            while (server)
            {
761
                /* ICECAST_LOG_DEBUG("Checking yps %s", server->url); */
762 763 764
                add_pending_yp (server);
                delete_marked_yp (server);
                server = server->next;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
765
            }
766 767
            yp_update = 0;
            thread_rwlock_unlock (&yp_lock);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
768 769
        }
    }
770 771 772 773 774
    thread_rwlock_destroy (&yp_lock);
    thread_mutex_destroy (&yp_pending_lock);
    /* free server and ypdata left */
    while (active_yps)
    {
775
        struct yp_server *server = (struct yp_server *)active_yps;
776 777 778
        active_yps = server->next;
        destroy_yp_server (server);
    }
779

780
    return NULL;
781
}
782

783 784 785


static void yp_destroy_ypdata(ypdata_t *ypdata)
786
{
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
787
    if (ypdata) {
788
        if (ypdata->mount) {
789
            free(ypdata->mount);
790 791
        }
        if (ypdata->url) {
792
            free(ypdata->url);
793
        }
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
        if (ypdata->sid) {
            free(ypdata->sid);
        }
        if (ypdata->server_name) {
            free(ypdata->server_name);
        }
        if (ypdata->server_desc) {
            free(ypdata->server_desc);
        }
        if (ypdata->server_genre) {
            free(ypdata->server_genre);
        }
        if (ypdata->cluster_password) {
            free(ypdata->cluster_password);
        }
        if (ypdata->listen_url) {
            free(ypdata->listen_url);
        }
        if (ypdata->current_song) {
            free(ypdata->current_song);
        }
        if (ypdata->bitrate) {
            free(ypdata->bitrate);
        }
        if (ypdata->server_type) {
            free(ypdata->server_type);
        }
821 822 823
        if (ypdata->audio_info) {
            free(ypdata->audio_info);
        }
824 825 826
        free(ypdata->subtype);
        free(ypdata->error_msg);
        free(ypdata);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
827
    }
828
}
829

830
static void add_yp_info(ypdata_t *yp, void *info, int type)
831 832
{
    char *escaped;
833 834

    if (!info)
835
        return;
836

837 838 839 840
    escaped = util_url_escape(info);
    if (escaped == NULL)
        return;

841 842
    switch (type)
    {
843
        case YP_SERVER_NAME:
844 845
            free (yp->server_name);
            yp->server_name = escaped;
846
            break;
847
        case YP_SERVER_DESC:
848 849
            free (yp->server_desc);
            yp->server_desc = escaped;
850
            break;
851
        case YP_SERVER_GENRE:
852 853
            free (yp->server_genre);
            yp->server_genre = escaped;
854
            break;
855
        case YP_SERVER_URL:
856 857
            free (yp->url);
            yp->url = escaped;
858
            break;
859
        case YP_BITRATE:
860 861
            free (yp->bitrate);
            yp->bitrate = escaped;
862
            break;
863
        case YP_AUDIO_INFO:
864 865
            free (yp->audio_info);
            yp->audio_info = escaped;
866
            break;
867
        case YP_SERVER_TYPE:
868 869
            free (yp->server_type);
            yp->server_type = escaped;
870
            break;
871
        case YP_CURRENT_SONG:
872 873
            free (yp->current_song);
            yp->current_song = escaped;
874
            break;
875
        case YP_CLUSTER_PASSWORD:
876 877
            free (yp->cluster_password);
            yp->cluster_password = escaped;
878
            break;
879
        case YP_SUBTYPE:
880 881
            free (yp->subtype);
            yp->subtype = escaped;
882
            break;
883 884
        default:
            free (escaped);
885 886 887 888 889
    }
}


/* Add YP entries to active servers */
890
void yp_add (const char *mount)
891 892 893 894 895 896 897 898
{
    struct yp_server *server;

    /* make sure YP thread is not modifying the lists */
    thread_rwlock_rlock (&yp_lock);

    /* make sure we don't race against another yp_add */
    thread_mutex_lock (&yp_pending_lock);
899
    server = (struct yp_server *)active_yps;
900 901 902
    while (server)
    {
        ypdata_t *yp;
903 904 905 906

        /* on-demand relays may already have a YP entry */
        yp = find_yp_mount (server->mounts, mount);
        if (yp == NULL)
907
        {
908 909 910 911
            /* add new ypdata to each servers pending yp */
            yp = create_yp_entry (mount);
            if (yp)
            {
912
                ICECAST_LOG_DEBUG("Adding %s to %s", mount, server->url);
913 914 915
                yp->server = server;
                yp->touch_interval = server->touch_interval;
                yp->next = server->pending_mounts;
916
                yp->next_update = time(NULL) + 60;
917 918 919
                server->pending_mounts = yp;
                yp_update = 1;
            }
920
        }
921
        else
922
            ICECAST_LOG_DEBUG("YP entry %s already exists", mount);
923 924 925 926 927 928 929 930 931 932 933
        server = server->next;
    }
    thread_mutex_unlock (&yp_pending_lock);
    thread_rwlock_unlock (&yp_lock);
}



/* Mark an existing entry in the YP list as to be marked for deletion */
void yp_remove (const char *mount)
{
934
    struct yp_server *server = (struct yp_server *)active_yps;
935 936 937 938

    thread_rwlock_rlock (&yp_lock);
    while (server)
    {
939 940 941
        ypdata_t *list = server->mounts;

        while (1)
942
        {
943 944 945 946 947 948 949 950
            ypdata_t *yp = find_yp_mount (list, mount);
            if (yp == NULL)
                break;
            if (yp->release || yp->remove)
            {
                list = yp->next;
                continue;   /* search again these are old entries */
            }
951
            ICECAST_LOG_DEBUG("release %s on YP %s", mount, server->url);
952
            yp->release = 1;
953
            yp->next_update = 0;
954
        }
955
        server = server->next;
956
    }
957
    thread_rwlock_unlock (&yp_lock);
958
}
959 960 961 962 963 964


/* This is similar to yp_remove, but we force a touch
 * attempt */
void yp_touch (const char *mount)
{
965
    struct yp_server *server = (struct yp_server *)active_yps;
966
    ypdata_t *search_list = NULL;
967 968

    thread_rwlock_rlock (&yp_lock);
969 970 971
    if (server)
        search_list = server->mounts;

972 973
    while (server)
    {
974
        ypdata_t *yp = find_yp_mount (search_list, mount);
975 976
        if (yp)
        {
977
            /* we may of found old entries not purged yet, so skip them */
978
            if (yp->release != 0 || yp->remove != 0)
979 980 981 982
            {
                search_list = yp->next;
                continue;
            }
983 984 985
            /* don't update the directory if there is a touch scheduled soon */
            if (yp->process == do_yp_touch && now + yp->touch_interval - yp->next_update > 60)
                yp->next_update = now + 3;
986 987
        }
        server = server->next;
988 989
        if (server)
            search_list = server->mounts;
990 991 992 993 994
    }
    thread_rwlock_unlock (&yp_lock);
}


995
void yp_shutdown (void)
996 997 998
{
    yp_running = 0;
    yp_update = 1;
999 1000
    if (yp_thread)
        thread_join (yp_thread);
1001
    curl_global_cleanup();
1002 1003
    free ((char*)server_version);
    server_version = NULL;
1004
    ICECAST_LOG_INFO("YP thread down");
1005 1006
}