yp.c 26.5 KB
Newer Older
1 2 3 4 5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 8 9 10 11 12
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

18 19
#include <stdio.h>
#include <string.h>
20
#include <stdlib.h>
21

Marvin Scholz's avatar
Marvin Scholz committed
22
#include "common/thread/thread.h"
23

24
#include "curl.h"
25 26 27 28 29 30
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "logging.h"
#include "format.h"
#include "source.h"
31
#include "cfgfile.h"
32
#include "stats.h"
33 34 35 36

#ifdef WIN32
#define snprintf _snprintf
#endif
37

38
#define CATMODULE "yp"
39

40 41 42
struct yp_server
{
    char        *url;
43
    char        *server_id;
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
    unsigned    url_timeout;
    unsigned    touch_interval;
    int         remove;

    CURL *curl;
    struct ypdata_tag *mounts, *pending_mounts;
    struct yp_server *next;
    char curl_error[CURL_ERROR_SIZE];
};



typedef struct ypdata_tag
{
    int remove;
59
    int release;
60 61 62 63 64 65 66 67 68 69 70 71 72 73
    int cmd_ok;

    char *sid;
    char *mount;
    char *url;
    char *listen_url;
    char *server_name;
    char *server_desc;
    char *server_genre;
    char *cluster_password;
    char *bitrate;
    char *audio_info;
    char *server_type;
    char *current_song;
74
    char *subtype;
75 76

    struct yp_server *server;
77 78 79 80
    time_t next_update;
    unsigned touch_interval;
    char *error_msg;
    int (*process)(struct ypdata_tag *yp, char *s, unsigned len);
81 82 83 84 85 86 87 88

    struct ypdata_tag *next;
} ypdata_t;


static rwlock_t yp_lock;
static mutex_t yp_pending_lock;

Karl Heyes's avatar
Karl Heyes committed
89
static volatile struct yp_server *active_yps = NULL, *pending_yps = NULL;
90
static volatile int yp_update = 0;
91 92 93
static int yp_running;
static time_t now;
static thread_type *yp_thread;
Karl Heyes's avatar
Karl Heyes committed
94
static volatile unsigned client_limit = 0;
95
static volatile char *server_version = NULL;
96 97

static void *yp_update_thread(void *arg);
98 99 100 101
static void add_yp_info(ypdata_t *yp, void *info, int type);
static int do_yp_remove(ypdata_t *yp, char *s, unsigned len);
static int do_yp_add(ypdata_t *yp, char *s, unsigned len);
static int do_yp_touch(ypdata_t *yp, char *s, unsigned len);
102 103
static void add_pending_yp (struct yp_server *server);
static void delete_marked_yp(struct yp_server *server);
104 105 106 107
static void yp_destroy_ypdata(ypdata_t *ypdata);


/* curl callback used to parse headers coming back from the YP server */
108
static size_t handle_returned_header (void *ptr, size_t size, size_t nmemb, void *stream)
109 110
{
    ypdata_t *yp = stream;
111
    size_t bytes = size * nmemb;
112

113
    /* ICECAST_LOG_DEBUG("header from YP is \"%.*s\"", bytes, ptr); */
114
    if (strncasecmp (ptr, "YPResponse: 1", 13) == 0)
115 116
        yp->cmd_ok = 1;

117
    if (strncasecmp (ptr, "YPMessage: ", 11) == 0)
118
    {
119
        size_t len = bytes - 11;
120 121 122
        free (yp->error_msg);
        yp->error_msg = calloc (1, len);
        if (yp->error_msg)
123
            sscanf (ptr + 11, "%[^\r\n]", yp->error_msg);
124 125 126 127
    }

    if (yp->process == do_yp_add)
    {
128
        if (strncasecmp (ptr, "SID: ", 5) == 0)
129
        {
130
            size_t len = bytes - 5;
131 132 133
            free (yp->sid);
            yp->sid = calloc (1, len);
            if (yp->sid)
134
                sscanf (ptr + 5, "%[^\r\n]", yp->sid);
135
        }
136
    }
137
    if (strncasecmp (ptr, "TouchFreq: ", 11) == 0)
138 139
    {
        unsigned secs;
140 141
        if ( sscanf (ptr + 11, "%u", &secs) != 1 )
            secs = 0;
142 143
        if (secs < 30)
            secs = 30;
144
        ICECAST_LOG_DEBUG("server touch interval is %u", secs);
145
        yp->touch_interval = secs;
146
    }
147
    return (size_t)bytes;
148 149 150 151 152 153 154 155
}


/* search the active and pending YP server lists */
static struct yp_server *find_yp_server (const char *url)
{
    struct yp_server *server;

156
    server = (struct yp_server *)active_yps;
157 158 159 160 161 162
    while (server)
    {
        if (strcmp (server->url, url) == 0)
            return server;
        server = server->next;
    }
163
    server = (struct yp_server *)pending_yps;
164 165 166 167 168 169 170 171 172 173 174 175
    while (server)
    {
        if (strcmp (server->url, url) == 0)
            break;
        server = server->next;
    }
    return server;
}


static void destroy_yp_server (struct yp_server *server)
{
176 177
    ypdata_t *yp;

178 179
    if (server == NULL)
        return;
180
    ICECAST_LOG_DEBUG("Removing YP server entry for %s", server->url);
181 182 183 184 185 186 187 188 189 190 191 192 193 194

    /* delete yps:
     * first move all pendings into main queue.
     * then mark all main queue entries for deleting.
     * then remove all marked entries.
     */
    add_pending_yp(server);
    yp = server->mounts;
    while (yp) {
        yp->remove = 1;
        yp = yp->next;
    }
    delete_marked_yp(server);

195
    icecast_curl_free(server->curl);
196 197
    if (server->mounts) ICECAST_LOG_WARN("active ypdata not freed");
    if (server->pending_mounts) ICECAST_LOG_WARN("pending ypdata not freed");
198
    free (server->url);
199
    free (server->server_id);
200 201 202 203 204 205
    free (server);
}



/* search for a ypdata entry corresponding to a specific mountpoint */
206
static ypdata_t *find_yp_mount (ypdata_t *mounts, const char *mount)
207
{
208
    ypdata_t *yp = mounts;
209 210 211 212 213 214 215 216 217
    while (yp)
    {
        if (strcmp (yp->mount, mount) == 0)
            break;
        yp = yp->next;
    }
    return yp;
}

218 219 220

void yp_recheck_config (ice_config_t *config)
{
221 222 223
    int i;
    struct yp_server *server;

224
    ICECAST_LOG_DEBUG("Updating YP configuration");
225 226
    thread_rwlock_rlock (&yp_lock);

227
    server = (struct yp_server *)active_yps;
228 229 230 231 232
    while (server)
    {
        server->remove = 1;
        server = server->next;
    }
Karl Heyes's avatar
Karl Heyes committed
233
    client_limit = config->client_limit;
234 235
    free ((char*)server_version);
    server_version = strdup (config->server_id);
236
    /* for each yp url in config, check to see if one exists
237 238 239 240 241 242 243 244 245 246 247 248 249
       if not, then add it. */
    for (i=0 ; i < config->num_yp_directories; i++)
    {
        server = find_yp_server (config->yp_url[i]);
        if (server == NULL)
        {
            server = calloc (1, sizeof (struct yp_server));

            if (server == NULL)
            {
                destroy_yp_server (server);
                break;
            }
250
            server->server_id = strdup ((char *)server_version);
251 252
            server->url = strdup (config->yp_url[i]);
            server->url_timeout = config->yp_url_timeout[i];
253
            server->touch_interval = config->yp_touch_interval[i];
254
            server->curl = icecast_curl_new(server->url, &(server->curl_error[0]));
255 256 257 258 259
            if (server->curl == NULL)
            {
                destroy_yp_server (server);
                break;
            }
260 261
            if (server->url_timeout > 10 || server->url_timeout < 1)
                server->url_timeout = 6;
262 263 264
            if (server->touch_interval < 30)
                server->touch_interval = 30;
            curl_easy_setopt (server->curl, CURLOPT_HEADERFUNCTION, handle_returned_header);
265
            server->next = (struct yp_server *)pending_yps;
266
            pending_yps = server;
267
            ICECAST_LOG_INFO("Adding new YP server \"%s\" (timeout %ds, default interval %ds)",
268 269 270 271 272 273 274 275 276
                    server->url, server->url_timeout, server->touch_interval);
        }
        else
        {
            server->remove = 0;
        }
    }
    thread_rwlock_unlock (&yp_lock);
    yp_update = 1;
277 278
}

279

280
void yp_initialize(void)
281
{
282
    ice_config_t *config = config_get_config();
283 284
    thread_rwlock_create (&yp_lock);
    thread_mutex_create (&yp_pending_lock);
285 286
    yp_recheck_config (config);
    config_release_config ();
287 288
    yp_thread = thread_create("YP Touch Thread", yp_update_thread,
                            (void *)NULL, THREAD_ATTACHED);
289
}
290 291 292



293
/* handler for curl, checks if successful handling occurred
294 295
 * return 0 for ok, -1 for this entry failed, -2 for server fail.
 * On failure case, update and process are modified
296
 */
297
static int send_to_yp (const char *cmd, ypdata_t *yp, char *post)
298
{
299 300 301
    int curlcode;
    struct yp_server *server = yp->server;

302
    /* ICECAST_LOG_DEBUG("send YP (%s):%s", cmd, post); */
303 304 305 306 307 308 309
    yp->cmd_ok = 0;
    curl_easy_setopt (server->curl, CURLOPT_POSTFIELDS, post);
    curl_easy_setopt (server->curl, CURLOPT_WRITEHEADER, yp);
    curlcode = curl_easy_perform (server->curl);
    if (curlcode)
    {
        yp->process = do_yp_add;
310
        yp->next_update = now + 1200;
311
        ICECAST_LOG_ERROR("connection to %s failed with \"%s\"", server->url, server->curl_error);
312
        return -2;
313
    }
314 315
    if (yp->cmd_ok == 0)
    {
316 317
        if (yp->error_msg == NULL)
            yp->error_msg = strdup ("no response from server");
318 319
        if (yp->process == do_yp_add)
        {
320
            ICECAST_LOG_ERROR("YP %s on %s failed: %s", cmd, server->url, yp->error_msg);
321
            yp->next_update = now + 7200;
322
        }
323
        if (yp->process == do_yp_touch)
324
        {
325 326 327 328 329 330 331
            /* At this point the touch request failed, either because they rejected our session
             * or the server isn't accessible. This means we have to wait before doing another
             * add request. We have a minimum delay but we could allow the directory server to
             * give us a wait time using the TouchFreq header. This time could be given in such
             * cases as a firewall block or incorrect listenurl.
             */
            if (yp->touch_interval < 1200)
332
                yp->next_update = now + 1200;
333
            else
334
                yp->next_update = now + yp->touch_interval;
335
            ICECAST_LOG_INFO("YP %s on %s failed: %s", cmd, server->url, yp->error_msg);
336
        }
337
        yp->process = do_yp_add;
338
        free(yp->sid);
339
        yp->sid = NULL;
340
        return -1;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
341
    }
342
    ICECAST_LOG_DEBUG("YP %s at %s succeeded", cmd, server->url);
343 344 345 346 347
    return 0;
}


/* routines for building and issues requests to the YP server */
348
static int do_yp_remove (ypdata_t *yp, char *s, unsigned len)
349
{
350 351
    int ret = 0;

352 353
    if (yp->sid)
    {
354
        ret = snprintf (s, len, "action=remove&sid=%s", yp->sid);
355 356 357
        if (ret >= (signed)len)
            return ret+1;

358
        ICECAST_LOG_INFO("clearing up YP entry for %s", yp->mount);
359
        ret = send_to_yp ("remove", yp, s);
360 361
        free (yp->sid);
        yp->sid = NULL;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
362
    }
363 364
    yp->remove = 1;
    yp->process = do_yp_add;
365
    yp_update = 1;
366

367
    return ret;
368 369
}

370

371
static int do_yp_add (ypdata_t *yp, char *s, unsigned len)
372
{
373 374
    int ret;
    char *value;
375 376 377 378 379 380
    ice_config_t *config;
    char *admin;

    config = config_get_config();
    admin = util_url_escape(config->admin);
    config_release_config();
381

382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
    value = stats_get_value (yp->mount, "server_type");
    add_yp_info (yp, value, YP_SERVER_TYPE);
    free (value);

    value = stats_get_value (yp->mount, "server_name");
    add_yp_info (yp, value, YP_SERVER_NAME);
    free (value);

    value = stats_get_value (yp->mount, "server_url");
    add_yp_info (yp, value, YP_SERVER_URL);
    free (value);

    value = stats_get_value (yp->mount, "genre");
    add_yp_info (yp, value, YP_SERVER_GENRE);
    free (value);

    value = stats_get_value (yp->mount, "bitrate");
399 400
    if (value == NULL)
        value = stats_get_value (yp->mount, "ice-bitrate");
401 402 403 404 405 406 407
    add_yp_info (yp, value, YP_BITRATE);
    free (value);

    value = stats_get_value (yp->mount, "server_description");
    add_yp_info (yp, value, YP_SERVER_DESC);
    free (value);

408
    value = stats_get_value (yp->mount, "subtype");
409 410 411 412 413 414
    add_yp_info (yp, value, YP_SUBTYPE);
    free (value);

    value = stats_get_value (yp->mount, "audio_info");
    add_yp_info (yp, value, YP_AUDIO_INFO);
    free (value);
415

416
    ret = snprintf (s, len, "action=add&admin=%s&sn=%s&genre=%s&cpswd=%s&desc="
417
                    "%s&url=%s&listenurl=%s&type=%s&stype=%s&b=%s&%s\r\n",
418
                    admin,
419 420
                    yp->server_name, yp->server_genre, yp->cluster_password,
                    yp->server_desc, yp->url, yp->listen_url,
421
                    yp->server_type, yp->subtype, yp->bitrate, yp->audio_info);
422 423
    free(admin);

424 425
    if (ret >= (signed)len)
        return ret+1;
426 427
    ret = send_to_yp ("add", yp, s);
    if (ret == 0)
428 429 430 431 432
    {
        yp->process = do_yp_touch;
        /* force first touch in 5 secs */
        yp->next_update = time(NULL) + 5;
    }
433
    return ret;
434 435 436
}


437
static int do_yp_touch (ypdata_t *yp, char *s, unsigned len)
438
{
Karl Heyes's avatar
Karl Heyes committed
439
    unsigned listeners = 0, max_listeners = 1;
440 441 442 443 444 445 446 447
    char *val, *artist, *title;
    int ret;

    artist = (char *)stats_get_value (yp->mount, "artist");
    title = (char *)stats_get_value (yp->mount, "title");
    if (artist || title)
    {
         char *song;
Philipp Schafft's avatar
Philipp Schafft committed
448
         const char *separator = " - ";
449 450 451 452 453 454 455 456 457 458
         if (artist == NULL)
         {
             artist = strdup("");
             separator = "";
         }
         if (title == NULL) title = strdup("");
         song = malloc (strlen (artist) + strlen (title) + strlen (separator) +1);
         if (song)
         {
             sprintf (song, "%s%s%s", artist, separator, title);
459 460
             add_yp_info(yp, song, YP_CURRENT_SONG);
             stats_event (yp->mount, "yp_currently_playing", song);
461 462 463 464 465 466 467 468 469 470 471 472
             free (song);
         }
    }
    free (artist);
    free (title);

    val = (char *)stats_get_value (yp->mount, "listeners");
    if (val)
    {
        listeners = atoi (val);
        free (val);
    }
Karl Heyes's avatar
Karl Heyes committed
473
    val = stats_get_value (yp->mount, "max_listeners");
474
    if (val == NULL || strcmp (val, "unlimited") == 0 || atoi(val) < 0)
Karl Heyes's avatar
Karl Heyes committed
475 476 477
        max_listeners = client_limit;
    else
        max_listeners = atoi (val);
478
    free (val);
Karl Heyes's avatar
Karl Heyes committed
479

480 481 482
    val = stats_get_value (yp->mount, "subtype");
    if (val)
    {
483
        add_yp_info (yp, val, YP_SUBTYPE);
484 485
        free (val);
    }
486 487

    ret = snprintf (s, len, "action=touch&sid=%s&st=%s"
Karl Heyes's avatar
Karl Heyes committed
488
            "&listeners=%u&max_listeners=%u&stype=%s\r\n",
489
            yp->sid, yp->current_song, listeners, max_listeners, yp->subtype);
490 491 492 493

    if (ret >= (signed)len)
        return ret+1; /* space required for above text and nul*/

494 495 496 497 498 499
    if (send_to_yp ("touch", yp, s) == 0)
    {
        yp->next_update = now + yp->touch_interval;
        return 0;
    }
    return -1;
500
}
501

502 503


504
static int process_ypdata(struct yp_server *server, ypdata_t *yp)
505
{
506
    unsigned len = 1024;
507
    char *s = NULL, *tmp;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
508

509
    if (now < yp->next_update)
510
        return 0;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
511

512 513 514
    /* loop just in case the memory area isn't big enough */
    while (1)
    {
515
        int ret;
516
        if ((tmp = realloc(s, len)) == NULL)
517
            return 0;
518
        s = tmp;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
519

520 521 522 523 524 525
        if (yp->release)
        {
            yp->process = do_yp_remove;
            yp->next_update = 0;
        }

526
        ret = yp->process (yp, s, len);
527
        if (ret <= 0)
528 529
        {
           free (s);
530
           return ret;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
531
        }
532
        len = ret;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
533
    }
534
    return 0;
535
}
536 537 538


static void yp_process_server (struct yp_server *server)
539
{
540
    ypdata_t *yp;
541
    int state = 0;
542

543
    /* ICECAST_LOG_DEBUG("processing yp server %s", server->url); */
544 545 546 547
    yp = server->mounts;
    while (yp)
    {
        now = time (NULL);
548 549 550 551 552
        /* if one of the streams shows that the server cannot be contacted then mark the
         * other entries for an update later. Assume YP server is dead and skip it for now
         */
        if (state == -2)
        {
553
            ICECAST_LOG_DEBUG("skipping %s on %s", yp->mount, server->url);
554 555 556 557 558
            yp->process = do_yp_add;
            yp->next_update += 900;
        }
        else
            state = process_ypdata (server, yp);
559 560 561 562 563 564
        yp = yp->next;
    }
}



565
static ypdata_t *create_yp_entry (const char *mount)
566 567
{
    ypdata_t *yp;
568
    char *s;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
569

570 571 572 573 574 575
    yp = calloc (1, sizeof (ypdata_t));
    do
    {
        unsigned len = 512;
        int ret;
        char *url;
576
        mount_proxy *mountproxy = NULL;
577 578 579 580
        ice_config_t *config;

        if (yp == NULL)
            break;
581
        yp->mount = strdup (mount);
582 583 584 585 586 587 588 589 590
        yp->server_name = strdup ("");
        yp->server_desc = strdup ("");
        yp->server_genre = strdup ("");
        yp->bitrate = strdup ("");
        yp->server_type = strdup ("");
        yp->cluster_password = strdup ("");
        yp->url = strdup ("");
        yp->current_song = strdup ("");
        yp->audio_info = strdup ("");
591
        yp->subtype = strdup ("");
592 593 594 595 596 597
        yp->process = do_yp_add;

        url = malloc (len);
        if (url == NULL)
            break;
        config = config_get_config();
598
        ret = snprintf (url, len, "http://%s:%d%s", config->hostname, config->port, mount);
599
        if (ret >= (signed)len)
600
        {
601 602
            s = realloc (url, ++ret);
            if (s) url = s;
603
            snprintf (url, ret, "http://%s:%d%s", config->hostname, config->port, mount);
604
        }
605

606
        mountproxy = config_find_mount (config, mount, MOUNT_TYPE_NORMAL);
607
        if (mountproxy && mountproxy->cluster_password)
608
            add_yp_info (yp, mountproxy->cluster_password, YP_CLUSTER_PASSWORD);
609
        config_release_config();
610

611 612 613 614 615 616 617 618 619 620 621 622 623 624
        yp->listen_url = util_url_escape (url);
        free (url);
        if (yp->listen_url == NULL)
            break;

        return yp;
    } while (0);

    yp_destroy_ypdata (yp);
    return NULL;
}


/* Check for changes in the YP servers configured */
625
static void check_servers (void)
626
{
627 628
    struct yp_server *server = (struct yp_server *)active_yps,
                     **server_p = (struct yp_server **)&active_yps;
629 630 631 632 633 634

    while (server)
    {
        if (server->remove)
        {
            struct yp_server *to_go = server;
635
            ICECAST_LOG_DEBUG("YP server \"%s\"removed", server->url);
636 637
            *server_p = server->next;
            server = server->next;
638
            destroy_yp_server(to_go);
639
            continue;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
640
        }
641 642 643 644 645 646 647 648
        server_p = &server->next;
        server = server->next;
    }
    /* add new server entries */
    while (pending_yps)
    {
        avl_node *node;

649
        server = (struct yp_server *)pending_yps;
650 651
        pending_yps = server->next;

652
        ICECAST_LOG_DEBUG("Add pending yps %s", server->url);
653
        server->next = (struct yp_server *)active_yps;
654 655 656 657 658 659 660 661 662 663
        active_yps = server;

        /* new YP server configured, need to populate with existing sources */
        avl_tree_rlock (global.source_tree);
        node = avl_get_first (global.source_tree);
        while (node)
        {
            ypdata_t *yp;

            source_t *source = node->key;
664
            if (source->yp_public && (yp = create_yp_entry (source->mount)) != NULL)
665
            {
666
                ICECAST_LOG_DEBUG("Adding existing mount %s", source->mount);
667 668 669 670
                yp->server = server;
                yp->touch_interval = server->touch_interval;
                yp->next = server->mounts;
                server->mounts = yp;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
671
            }
672
            node = avl_get_next (node);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
673
        }
674
        avl_tree_unlock (global.source_tree);
675
    }
676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695
}


static void add_pending_yp (struct yp_server *server)
{
    ypdata_t *current, *yp;
    unsigned count = 0;

    if (server->pending_mounts == NULL)
        return;
    current = server->mounts;
    server->mounts = server->pending_mounts;
    server->pending_mounts = NULL;
    yp = server->mounts;
    while (1)
    {
        count++;
        if (yp->next == NULL)
            break;
        yp = yp->next;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
696
    }
697
    yp->next = current;
698
    ICECAST_LOG_DEBUG("%u YP entries added to %s", count, server->url);
699 700
}

701

702
static void delete_marked_yp(struct yp_server *server)
703 704
{
    ypdata_t *yp = server->mounts, **prev = &server->mounts;
705

706 707 708 709 710
    while (yp)
    {
        if (yp->remove)
        {
            ypdata_t *to_go = yp;
711
            ICECAST_LOG_DEBUG("removed %s from YP server %s", yp->mount, server->url);
712 713
            *prev = yp->next;
            yp = yp->next;
714
            yp_destroy_ypdata(to_go);
715 716 717 718 719
            continue;
        }
        prev = &yp->next;
        yp = yp->next;
    }
720
}
721 722 723


static void *yp_update_thread(void *arg)
724
{
725
    ICECAST_LOG_INFO("YP update thread started");
726 727 728 729 730 731 732 733 734 735

    yp_running = 1;
    while (yp_running)
    {
        struct yp_server *server;

        thread_sleep (200000);

        /* do the YP communication */
        thread_rwlock_rlock (&yp_lock);
736
        server = (struct yp_server *)active_yps;
737 738
        while (server)
        {
739
            /* ICECAST_LOG_DEBUG("trying %s", server->url); */
740 741
            yp_process_server (server);
            server = server->next;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
742
        }
743 744 745 746 747 748 749
        thread_rwlock_unlock (&yp_lock);

        /* update the local YP structure */
        if (yp_update)
        {
            thread_rwlock_wlock (&yp_lock);
            check_servers ();
750
            server = (struct yp_server *)active_yps;
751 752
            while (server)
            {
753
                /* ICECAST_LOG_DEBUG("Checking yps %s", server->url); */
754 755 756
                add_pending_yp (server);
                delete_marked_yp (server);
                server = server->next;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
757
            }
758 759
            yp_update = 0;
            thread_rwlock_unlock (&yp_lock);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
760 761
        }
    }
762 763 764 765 766
    thread_rwlock_destroy (&yp_lock);
    thread_mutex_destroy (&yp_pending_lock);
    /* free server and ypdata left */
    while (active_yps)
    {
767
        struct yp_server *server = (struct yp_server *)active_yps;
768 769 770
        active_yps = server->next;
        destroy_yp_server (server);
    }
771

772
    return NULL;
773
}
774

775 776 777


static void yp_destroy_ypdata(ypdata_t *ypdata)
778
{
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
779
    if (ypdata) {
780
        if (ypdata->mount) {
781
            free(ypdata->mount);
782 783
        }
        if (ypdata->url) {
784
            free(ypdata->url);
785
        }
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812
        if (ypdata->sid) {
            free(ypdata->sid);
        }
        if (ypdata->server_name) {
            free(ypdata->server_name);
        }
        if (ypdata->server_desc) {
            free(ypdata->server_desc);
        }
        if (ypdata->server_genre) {
            free(ypdata->server_genre);
        }
        if (ypdata->cluster_password) {
            free(ypdata->cluster_password);
        }
        if (ypdata->listen_url) {
            free(ypdata->listen_url);
        }
        if (ypdata->current_song) {
            free(ypdata->current_song);
        }
        if (ypdata->bitrate) {
            free(ypdata->bitrate);
        }
        if (ypdata->server_type) {
            free(ypdata->server_type);
        }
813 814 815
        if (ypdata->audio_info) {
            free(ypdata->audio_info);
        }
816 817 818
        free(ypdata->subtype);
        free(ypdata->error_msg);
        free(ypdata);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
819
    }
820
}
821

822
static void add_yp_info(ypdata_t *yp, void *info, int type)
823 824
{
    char *escaped;
825 826

    if (!info)
827
        return;
828

829 830 831 832
    escaped = util_url_escape(info);
    if (escaped == NULL)
        return;

833 834
    switch (type)
    {
835
        case YP_SERVER_NAME:
836 837
            free (yp->server_name);
            yp->server_name = escaped;
838
            break;
839
        case YP_SERVER_DESC:
840 841
            free (yp->server_desc);
            yp->server_desc = escaped;
842
            break;
843
        case YP_SERVER_GENRE:
844 845
            free (yp->server_genre);
            yp->server_genre = escaped;
846
            break;
847
        case YP_SERVER_URL:
848 849
            free (yp->url);
            yp->url = escaped;
850
            break;
851
        case YP_BITRATE:
852 853
            free (yp->bitrate);
            yp->bitrate = escaped;
854
            break;
855
        case YP_AUDIO_INFO:
856 857
            free (yp->audio_info);
            yp->audio_info = escaped;
858
            break;
859
        case YP_SERVER_TYPE:
860 861
            free (yp->server_type);
            yp->server_type = escaped;
862
            break;
863
        case YP_CURRENT_SONG:
864 865
            free (yp->current_song);
            yp->current_song = escaped;
866
            break;
867
        case YP_CLUSTER_PASSWORD:
868 869
            free (yp->cluster_password);
            yp->cluster_password = escaped;
870
            break;
871
        case YP_SUBTYPE:
872 873
            free (yp->subtype);
            yp->subtype = escaped;
874
            break;
875 876
        default:
            free (escaped);
877 878 879 880 881
    }
}


/* Add YP entries to active servers */
882
void yp_add (const char *mount)
883 884 885 886 887 888 889 890
{
    struct yp_server *server;

    /* make sure YP thread is not modifying the lists */
    thread_rwlock_rlock (&yp_lock);

    /* make sure we don't race against another yp_add */
    thread_mutex_lock (&yp_pending_lock);
891
    server = (struct yp_server *)active_yps;
892 893 894
    while (server)
    {
        ypdata_t *yp;
895 896 897 898

        /* on-demand relays may already have a YP entry */
        yp = find_yp_mount (server->mounts, mount);
        if (yp == NULL)
899
        {
900 901 902 903
            /* add new ypdata to each servers pending yp */
            yp = create_yp_entry (mount);
            if (yp)
            {
904
                ICECAST_LOG_DEBUG("Adding %s to %s", mount, server->url);
905 906 907
                yp->server = server;
                yp->touch_interval = server->touch_interval;
                yp->next = server->pending_mounts;
908
                yp->next_update = time(NULL) + 60;
909 910 911
                server->pending_mounts = yp;
                yp_update = 1;
            }
912
        }
913
        else
914
            ICECAST_LOG_DEBUG("YP entry %s already exists", mount);
915 916 917 918 919 920 921 922 923 924 925
        server = server->next;
    }
    thread_mutex_unlock (&yp_pending_lock);
    thread_rwlock_unlock (&yp_lock);
}



/* Mark an existing entry in the YP list as to be marked for deletion */
void yp_remove (const char *mount)
{
926
    struct yp_server *server = (struct yp_server *)active_yps;
927 928 929 930

    thread_rwlock_rlock (&yp_lock);
    while (server)
    {
931 932 933
        ypdata_t *list = server->mounts;

        while (1)
934
        {
935 936 937 938 939 940 941 942
            ypdata_t *yp = find_yp_mount (list, mount);
            if (yp == NULL)
                break;
            if (yp->release || yp->remove)
            {
                list = yp->next;
                continue;   /* search again these are old entries */
            }
943
            ICECAST_LOG_DEBUG("release %s on YP %s", mount, server->url);
944
            yp->release = 1;
945
            yp->next_update = 0;
946
        }
947
        server = server->next;
948
    }
949
    thread_rwlock_unlock (&yp_lock);
950
}
951 952 953 954 955 956


/* This is similar to yp_remove, but we force a touch
 * attempt */
void yp_touch (const char *mount)
{
957
    struct yp_server *server = (struct yp_server *)active_yps;
958
    ypdata_t *search_list = NULL;
959 960

    thread_rwlock_rlock (&yp_lock);
961 962 963
    if (server)
        search_list = server->mounts;

964 965
    while (server)
    {
966
        ypdata_t *yp = find_yp_mount (search_list, mount);
967 968
        if (yp)
        {
969
            /* we may of found old entries not purged yet, so skip them */
970
            if (yp->release != 0 || yp->remove != 0)
971 972 973 974
            {
                search_list = yp->next;
                continue;
            }
975 976 977
            /* don't update the directory if there is a touch scheduled soon */
            if (yp->process == do_yp_touch && now + yp->touch_interval - yp->next_update > 60)
                yp->next_update = now + 3;
978 979
        }
        server = server->next;
980 981
        if (server)
            search_list = server->mounts;
982 983 984 985 986
    }
    thread_rwlock_unlock (&yp_lock);
}


987
void yp_shutdown (void)
988 989 990
{
    yp_running = 0;
    yp_update = 1;
991 992
    if (yp_thread)
        thread_join (yp_thread);
993 994
    free ((char*)server_version);
    server_version = NULL;
995
    ICECAST_LOG_INFO("YP thread down");
996 997
}